diff --git a/Jenkinsfile2 b/Jenkinsfile2 index f4dcdb242e..c41e739bd3 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -315,7 +315,9 @@ def pre_test_build_win() { python.exe -m pip install --upgrade pip python -m pip uninstall taospy -y python -m pip install taospy==2.7.10 - xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32 + python -m pip uninstall taos-ws-py -y + python -m pip install taos-ws-py==0.2.8 + xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32 ''' return 1 } diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000000..be2be525ba --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,5 @@ +# Security Policy + +## Reporting a Vulnerability + +Please submit CVE to https://github.com/taosdata/TDengine/security/advisories. diff --git a/cmake/mxml_CMakeLists.txt.in b/cmake/mxml_CMakeLists.txt.in index 1ac90ebdd4..762df40e10 100644 --- a/cmake/mxml_CMakeLists.txt.in +++ b/cmake/mxml_CMakeLists.txt.in @@ -1,7 +1,7 @@ # cos ExternalProject_Add(mxml GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git - GIT_TAG release-2.12 + GIT_TAG v2.12 SOURCE_DIR "${TD_CONTRIB_DIR}/mxml" #BINARY_DIR "" BUILD_IN_SOURCE TRUE diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 9d2b54dab3..340a3e917b 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -292,11 +292,11 @@ CONCAT_WS(separator_expr, expr1, expr2 [, expr] ...) LENGTH(expr) ``` -**Description**: The length in bytes of a string +**Description**: The length in bytes **Return value type**: Bigint -**Applicable data types**: VARCHAR and NCHAR fields or columns +**Applicable data types**: VARCHAR and NCHAR and VARBINARY **Nested query**: It can be used in both the outer query and inner query in a nested query. diff --git a/docs/zh/02-intro.md b/docs/zh/02-intro.md index 68a2541717..bb989f27da 100644 --- a/docs/zh/02-intro.md +++ b/docs/zh/02-intro.md @@ -10,7 +10,7 @@ TDengine 是一款开源、高性能、云原生的[时序数据库](https://tde ## 主要产品 -TDengine 有三个主要产品:TDengine Pro (即 TDengine 企业版),TDengine Cloud,和 TDengine OSS,关于它们的具体定义请参考 +TDengine 有三个主要产品:TDengine Enterprise (即 TDengine 企业版),TDengine Cloud,和 TDengine OSS,关于它们的具体定义请参考 - [TDengine 企业版](https://www.taosdata.com/tdengine-pro) - [TDengine 云服务](https://cloud.taosdata.com/?utm_source=menu&utm_medium=webcn) - [TDengine 开源版](https://www.taosdata.com/tdengine-oss) diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index 0ff00d1710..64e9a3daed 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -1004,7 +1004,7 @@ TaosConsumer consumer = new TaosConsumer<>(config); - httpConnectTimeout: 创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。 - messageWaitTimeout: 数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。 - httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。 - 其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group) + 其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group) #### 订阅消费数据 @@ -1082,7 +1082,7 @@ consumer.unsubscribe(); consumer.close() ``` -详情请参考:[数据订阅](../../../develop/tmq) +详情请参考:[数据订阅](../../develop/tmq) #### 完整示例 @@ -1373,7 +1373,7 @@ public static void main(String[] args) throws Exception { **解决方法**: 更换 taos-jdbcdriver 3.0.2+ 版本。 -其它问题请参考 [FAQ](../../../train-faq/faq) +其它问题请参考 [FAQ](../../train-faq/faq) ## API 参考 diff --git a/docs/zh/08-connector/26-rust.mdx b/docs/zh/08-connector/26-rust.mdx index 3e51aa72bb..018552117e 100644 --- a/docs/zh/08-connector/26-rust.mdx +++ b/docs/zh/08-connector/26-rust.mdx @@ -352,7 +352,7 @@ client.put(&sml_data)? ### 数据订阅 -TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。 +TDengine 通过消息队列 [TMQ](../../taos-sql/tmq/) 启动一个订阅。 #### 创建 Topic @@ -491,7 +491,7 @@ let taos = pool.get()?; ## 常见问题 -请参考 [FAQ](../../../train-faq/faq) +请参考 [FAQ](../../train-faq/faq) ## API 参考 diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index cfec71934c..8b87a18e54 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -292,11 +292,11 @@ CONCAT_WS(separator_expr, expr1, expr2 [, expr] ...) LENGTH(expr) ``` -**功能说明**:以字节计数的字符串长度。 +**功能说明**:以字节计数的长度。 **返回结果类型**:BIGINT。 -**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。 +**适用数据类型**:VARCHAR, NCHAR, VARBINARY。 **嵌套子查询支持**:适用于内层查询和外层查询。 diff --git a/docs/zh/14-reference/05-taosbenchmark.md b/docs/zh/14-reference/05-taosbenchmark.md index d5ae95b20b..e42d921dc7 100644 --- a/docs/zh/14-reference/05-taosbenchmark.md +++ b/docs/zh/14-reference/05-taosbenchmark.md @@ -13,7 +13,7 @@ taosBenchmark (曾用名 taosdemo ) 是一个用于测试 TDengine 产品性能 taosBenchmark 有两种安装方式: -- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考[ TDengine 安装](../../operation/pkg-install)。 +- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考 [TDengine 安装](../../get-started/)。 - 单独编译 taos-tools 并安装, 详情请参考 [taos-tools](https://github.com/taosdata/taos-tools) 仓库。 diff --git a/docs/zh/14-reference/14-taosKeeper.md b/docs/zh/14-reference/14-taosKeeper.md index 738d351d45..92c85e7a2c 100644 --- a/docs/zh/14-reference/14-taosKeeper.md +++ b/docs/zh/14-reference/14-taosKeeper.md @@ -16,7 +16,7 @@ taosKeeper 是 TDengine 3.0 版本监控指标的导出工具,通过简单的 taosKeeper 有两种安装方式: taosKeeper 安装方式: -- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../operation/pkg-install)。 +- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../get-started/)。 - 单独编译 taosKeeper 并安装,详情请参考 [taosKeeper](https://github.com/taosdata/taoskeeper) 仓库。 diff --git a/docs/zh/20-third-party/11-kafka.md b/docs/zh/20-third-party/11-kafka.md index 2f09e50f0b..3d8822fdae 100644 --- a/docs/zh/20-third-party/11-kafka.md +++ b/docs/zh/20-third-party/11-kafka.md @@ -23,7 +23,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送 1. Linux 操作系统 2. 已安装 Java 8 和 Maven 3. 已安装 Git、curl、vi -4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../operation/pkg-install) +4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../get-started/) ## 安装 Kafka diff --git a/include/common/ttime.h b/include/common/ttime.h index de74e48100..37e3045817 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -75,7 +75,7 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) { int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval); -int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision); +int32_t taosTimeCountIntervalForFill(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision, int32_t order); int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision); int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 99160f1519..ea3524d12d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -41,16 +41,16 @@ enum { STREAM_STATUS__PAUSE, }; -enum { +typedef enum ETaskStatus { TASK_STATUS__NORMAL = 0, TASK_STATUS__DROPPING, - TASK_STATUS__FAIL, + TASK_STATUS__UNINIT, // not used, an placeholder TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore -}; +} ETaskStatus; enum { TASK_SCHED_STATUS__INACTIVE = 1, diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 408a5664a8..19efa7b169 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -33,17 +33,14 @@ adapterName="taosadapter" benchmarkName="taosBenchmark" dumpName="taosdump" demoName="taosdemo" -xname="taosx" clientName2="taos" serverName2="${clientName2}d" configFile2="${clientName2}.cfg" productName2="TDengine" emailName2="taosdata.com" -xname2="${clientName2}x" adapterName2="${clientName2}adapter" -explorerName="${clientName2}-explorer" benchmarkName2="${clientName2}Benchmark" demoName2="${clientName2}demo" dumpName2="${clientName2}dump" @@ -218,8 +215,6 @@ function install_bin() { ${csudo}rm -f ${bin_link_dir}/${demoName2} || : ${csudo}rm -f ${bin_link_dir}/${benchmarkName2} || : ${csudo}rm -f ${bin_link_dir}/${dumpName2} || : - ${csudo}rm -f ${bin_link_dir}/${xname2} || : - ${csudo}rm -f ${bin_link_dir}/${explorerName} || : ${csudo}rm -f ${bin_link_dir}/set_core || : ${csudo}rm -f ${bin_link_dir}/TDinsight.sh || : @@ -233,8 +228,6 @@ function install_bin() { [ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${demoName2} || : [ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${benchmarkName2} || : [ -x ${install_main_dir}/bin/${dumpName2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${dumpName2} ${bin_link_dir}/${dumpName2} || : - [ -x ${install_main_dir}/bin/${xname2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${xname2} ${bin_link_dir}/${xname2} || : - [ -x ${install_main_dir}/bin/${explorerName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${explorerName} ${bin_link_dir}/${explorerName} || : [ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -sf ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || : if [ "$clientName2" == "${clientName}" ]; then [ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || : @@ -703,26 +696,6 @@ function clean_service_on_systemd() { # if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then # ${csudo}rm -f ${service_config_dir}/${serverName2}.service # fi - x_service_config="${service_config_dir}/${xName2}.service" - if [ -e "$x_service_config" ]; then - if systemctl is-active --quiet ${xName2}; then - echo "${productName2} ${xName2} is running, stopping it..." - ${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null - fi - ${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null - ${csudo}rm -f ${x_service_config} - fi - - explorer_service_config="${service_config_dir}/${explorerName2}.service" - if [ -e "$explorer_service_config" ]; then - if systemctl is-active --quiet ${explorerName2}; then - echo "${productName2} ${explorerName2} is running, stopping it..." - ${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null - fi - ${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null - ${csudo}rm -f ${explorer_service_config} - ${csudo}rm -f /etc/${clientName2}/explorer.toml - fi } function install_service_on_systemd() { diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index 655629b92c..9e70a6bbf1 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -338,7 +338,7 @@ if [ "$verMode" == "cluster" ]; then tmp_pwd=`pwd` cd ${install_dir}/connector if [ ! -d taos-connector-jdbc ];then - git clone -b 3.2.1 --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||: + git clone -b main --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||: fi cd taos-connector-jdbc mvn clean package -Dmaven.test.skip=true diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index b555f4e683..dbddf9cac6 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -380,8 +380,7 @@ void destroySubRequests(SRequestObj *pRequest) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, - pTmp->requestId); + tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId); break; } } @@ -398,7 +397,7 @@ void destroySubRequests(SRequestObj *pRequest) { removeRequest(pTmp->self); releaseRequest(pTmp->self); } else { - tscError("0x%" PRIx64 " is not there", tmpRefId); + tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId); break; } } @@ -492,8 +491,7 @@ void stopAllQueries(SRequestObj *pRequest) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, - pTmp->requestId); + tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId); break; } } @@ -512,7 +510,7 @@ void stopAllQueries(SRequestObj *pRequest) { taosStopQueryImpl(pTmp); releaseRequest(pTmp->self); } else { - tscError("0x%" PRIx64 " is not there", tmpRefId); + tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId); break; } } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 73b4ec2a74..ac7a6e6646 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -874,8 +874,13 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult if (TSDB_CODE_SUCCESS == code) { code = cloneCatalogReq(&pNewWrapper->pCatalogReq, pWrapper->pCatalogReq); } - doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code); - nodesDestroyNode(pRoot); + if (TSDB_CODE_SUCCESS == code) { + doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code); + nodesDestroyNode(pRoot); + } else { + handleQueryAnslyseRes(pWrapper, pResultMeta, code); + return; + } } void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) { @@ -1148,8 +1153,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { pReqList[++reqIdx] = pTmp; releaseRequest(tmpRefId); } else { - tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId, - pTmp->requestId); + tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId); break; } } @@ -1162,7 +1166,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { removeRequest(pTmp->self); releaseRequest(pTmp->self); } else { - tscError("0x%" PRIx64 " is not there", tmpRefId); + tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId); break; } } diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index e9313e0591..7d65ac424f 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -692,34 +692,67 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision) + fraction); } -int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) { +/** + * @brief calc how many windows after filling between skey and ekey + * @notes for asc order + * skey ---> ekey + * ^ ^ + * _____!_____.........._____|_____.. + * |__1__) + * |__2__)...-->|_ret+1_) + * skey + ret * interval <= ekey + * skey + ret * interval + interval > ekey + * ======> (ekey - skey - interval) / interval < ret <= (ekey - skey) / interval + * For keys from blocks which do not need filling, skey + ret * interval == ekey. + * For keys need filling, skey + ret * interval <= ekey. + * Total num of windows is ret + 1(the last window) + * + * for desc order + * skey <--- ekey + * ^ ^ + * _____|____..........______!____... + * |_first_) + * |__1__) + * |_ret_)<--...|__2__) + * skey >= ekey - ret * interval + * skey < ekey - ret * interval + interval + *=======> (ekey - skey) / interval <= ret < (ekey - skey + interval) / interval + * For keys from blocks which do not need filling, skey == ekey - ret * interval. + * For keys need filling, skey >= ekey - ret * interval. + * Total num of windows is ret + 1(the first window) + */ +int32_t taosTimeCountIntervalForFill(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision, + int32_t order) { if (ekey < skey) { int64_t tmp = ekey; ekey = skey; skey = tmp; } + int32_t ret; if (unit != 'n' && unit != 'y') { - return (int32_t)((ekey - skey) / interval); + ret = (int32_t)((ekey - skey) / interval); + if (order == TSDB_ORDER_DESC && ret * interval < (ekey - skey)) ret += 1; + } else { + skey /= (int64_t)(TSDB_TICK_PER_SECOND(precision)); + ekey /= (int64_t)(TSDB_TICK_PER_SECOND(precision)); + + struct tm tm; + time_t t = (time_t)skey; + taosLocalTime(&t, &tm, NULL); + int32_t smon = tm.tm_year * 12 + tm.tm_mon; + + t = (time_t)ekey; + taosLocalTime(&t, &tm, NULL); + int32_t emon = tm.tm_year * 12 + tm.tm_mon; + + if (unit == 'y') { + interval *= 12; + } + ret = (emon - smon) / (int32_t)interval; + if (order == TSDB_ORDER_DESC && ret * interval < (smon - emon)) ret += 1; } - - skey /= (int64_t)(TSDB_TICK_PER_SECOND(precision)); - ekey /= (int64_t)(TSDB_TICK_PER_SECOND(precision)); - - struct tm tm; - time_t t = (time_t)skey; - taosLocalTime(&t, &tm, NULL); - int32_t smon = tm.tm_year * 12 + tm.tm_mon; - - t = (time_t)ekey; - taosLocalTime(&t, &tm, NULL); - int32_t emon = tm.tm_year * 12 + tm.tm_mon; - - if (unit == 'y') { - interval *= 12; - } - - return (emon - smon) / (int32_t)interval; + return ret + 1; } int64_t taosTimeTruncate(int64_t ts, const SInterval* pInterval) { diff --git a/source/dnode/mnode/impl/src/mndIndex.c b/source/dnode/mnode/impl/src/mndIndex.c index 2e78116a86..041cc664e5 100644 --- a/source/dnode/mnode/impl/src/mndIndex.c +++ b/source/dnode/mnode/impl/src/mndIndex.c @@ -439,7 +439,7 @@ static int32_t mndProcessCreateIdxReq(SRpcMsg *pReq) { pDb = mndAcquireDbByStb(pMnode, createReq.stbName); if (pDb == NULL) { - terrno = TSDB_CODE_MND_INVALID_DB; + terrno = TSDB_CODE_MND_DB_NOT_EXIST; goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 9847024bee..bb3f92be71 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -259,7 +259,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { if (pDb == NULL) { if (0 != strcmp(connReq.db, TSDB_INFORMATION_SCHEMA_DB) && (0 != strcmp(connReq.db, TSDB_PERFORMANCE_SCHEMA_DB))) { - terrno = TSDB_CODE_MND_INVALID_DB; + terrno = TSDB_CODE_MND_DB_NOT_EXIST; mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, terrstr()); goto _OVER; @@ -313,7 +313,7 @@ _CONNECT: sprintf(obj, "%s:%d", ip, pConn->port); char detail[1000] = {0}; - sprintf(detail, "connType:%d, db:%s, pid:%d, startTime:%" PRId64 ", sVer:%s, app:%s", + sprintf(detail, "connType:%d, db:%s, pid:%d, startTime:%" PRId64 ", sVer:%s, app:%s", connReq.connType, connReq.db, connReq.pid, connReq.startTime, connReq.sVer, connReq.app); auditRecord(pReq, pMnode->clusterId, "login", connReq.user, obj, detail); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 431864ae97..3dddda9fed 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2320,7 +2320,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) { pDb = mndAcquireDbByStb(pMnode, alterReq.name); if (pDb == NULL) { - terrno = TSDB_CODE_MND_INVALID_DB; + terrno = TSDB_CODE_MND_DB_NOT_EXIST; goto _OVER; } @@ -2342,9 +2342,9 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) { alterReq.alterType, alterReq.numOfFields, alterReq.ttl); SName name = {0}; - tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB); + tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, alterReq.name, detail); + auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, name.tname, detail); _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { @@ -3616,7 +3616,7 @@ static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq) { pDb = mndAcquireDbByStb(pMnode, tagIdxReq.dbFName); if (pDb == NULL) { - terrno = TSDB_CODE_MND_INVALID_DB; + terrno = TSDB_CODE_MND_DB_NOT_EXIST; goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5752dae87b..66acbcc05b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -868,6 +868,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); taosThreadMutexLock(&execNodeList.lock); + mDebug("register to stream task node list"); keepStreamTasksInBuf(&streamObj, &execNodeList); taosThreadMutexUnlock(&execNodeList.lock); @@ -876,8 +877,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { char detail[2000] = {0}; sprintf(detail, "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 - ", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 - ", maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, + ", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64 + ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark, createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, @@ -1574,8 +1575,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } else if (taskStatus == TASK_STATUS__DROPPING) { memcpy(varDataVal(status), "dropping", 8); varDataSetLen(status, 8); - } else if (taskStatus == TASK_STATUS__FAIL) { - memcpy(varDataVal(status), "fail", 4); + } else if (taskStatus == TASK_STATUS__UNINIT) { + memcpy(varDataVal(status), "uninit", 6); varDataSetLen(status, 4); } else if (taskStatus == TASK_STATUS__STOP) { memcpy(varDataVal(status), "stop", 4); @@ -2016,14 +2017,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); + const SEp* p = GET_ACTIVE_EP(pCurrent); - for (int32_t i = 0; i < pCurrent->numOfEps; ++i) { - const SEp *p = &(pCurrent->eps[i]); - if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { - return false; - } + if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { + return false; } - return true; } @@ -2120,6 +2118,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid); int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); if (code != TSDB_CODE_SUCCESS) { + sdbCancelFetch(pSdb, pIter); return code; } } @@ -2223,18 +2222,22 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { code = mndProcessVgroupChange(pMnode, &changeInfo); + + // keep the new vnode snapshot + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { + mDebug("create trans successfully, update cached node list"); + taosArrayDestroy(execNodeList.pNodeEntryList); + execNodeList.pNodeEntryList = pNodeSnapshot; + execNodeList.ts = ts; + } + } else { + mDebug("no update found in nodeList"); + taosArrayDestroy(pNodeSnapshot); } taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); - // keep the new vnode snapshot - if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { - taosArrayDestroy(execNodeList.pNodeEntryList); - execNodeList.pNodeEntryList = pNodeSnapshot; - execNodeList.ts = ts; - } - mDebug("end to do stream task node change checking"); atomic_store_32(&mndNodeCheckSentinel, 0); return 0; @@ -2284,7 +2287,6 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SStreamHbMsg req = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -2309,8 +2311,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - + int64_t k[2] = {p->streamId, p->taskId}; int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); if (index == NULL) { continue; diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index 6ab6d364cb..b1b94c65fb 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -65,7 +65,7 @@ TEST_F(MndTestProfile, 01_ConnectMsg) { connId = connectRsp.connId; } -TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) { +TEST_F(MndTestProfile, 02_ConnectMsg_NotExistDB) { char passwd[] = "taosdata"; char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t*)passwd, strlen(passwd), secretEncrypt); @@ -73,7 +73,7 @@ TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) { SConnectReq connectReq = {0}; connectReq.pid = 1234; strcpy(connectReq.app, "mnode_test_profile"); - strcpy(connectReq.db, "invalid_db"); + strcpy(connectReq.db, "not_exist_db"); strcpy(connectReq.user, "root"); strcpy(connectReq.passwd, secretEncrypt); strcpy(connectReq.sVer, version); @@ -84,7 +84,7 @@ TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_DB); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DB_NOT_EXIST); ASSERT_EQ(pRsp->contLen, 0); } diff --git a/source/dnode/mnode/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp index dd03917fc2..1adbb87e19 100644 --- a/source/dnode/mnode/impl/test/stb/stb.cpp +++ b/source/dnode/mnode/impl/test/stb/stb.cpp @@ -448,7 +448,7 @@ TEST_F(MndTestStb, 02_Alter_Stb_AddTag) { { void* pReq = BuildAlterStbAddTagReq("1.d3.stb", "tag4", &contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); - ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_DB); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DB_NOT_EXIST); } { @@ -665,7 +665,7 @@ TEST_F(MndTestStb, 06_Alter_Stb_AddColumn) { { void* pReq = BuildAlterStbAddColumnReq("1.d7.stb", "tag4", &contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); - ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_DB); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DB_NOT_EXIST); } { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 1146cfdc46..14ca7f3b02 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -165,6 +165,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); int32_t tqCheckAndRunStreamTask(STQ* pTq); +int32_t tqStartStreamTasks(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); // tq util diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index d262567953..59416fd284 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -52,7 +52,9 @@ int metaFinishCommit(SMeta *pMeta, TXN *txn) { return tdbPostCommit(pMeta->pEnv int metaPrepareAsyncCommit(SMeta *pMeta) { // return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn); int code = 0; + metaWLock(pMeta); code = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn); + metaULock(pMeta); code = tdbCommit(pMeta->pEnv, pMeta->txn); return code; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 9a298a4bb7..18e1aa303b 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -931,21 +931,16 @@ end: } int metaTtlFindExpired(SMeta *pMeta, int64_t timePointMs, SArray *tbUids, int32_t ttlDropMaxCount) { - metaWLock(pMeta); - int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn); - if (ret != 0) { - metaError("ttl failed to flush, ret:%d", ret); - goto _err; - } + metaRLock(pMeta); + + int ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount); + + metaULock(pMeta); - ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount); if (ret != 0) { metaError("ttl failed to find expired table, ret:%d", ret); - goto _err; } -_err: - metaULock(pMeta); return ret; } diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index f920296b4a..58ecf54512 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -299,7 +299,7 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) { ret = 0; _out: - metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix, + metaTrace("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix, updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays); return ret; @@ -323,7 +323,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) { ret = 0; _out: - metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid); + metaTrace("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid); return ret; } @@ -363,17 +363,37 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime ret = 0; _out: - metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid, + metaTrace("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid, pUpdCtimeCtx->changeTimeMs); return ret; } int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) { + int ret = -1; + STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX}; STtlExpiredCtx expiredCtx = { .ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids}; - return tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry); + ret = tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry); + if (ret) { + goto _out; + } + + size_t vIdx = 0; + for (size_t i = 0; i < pTbUids->size; i++) { + tb_uid_t *pUid = taosArrayGet(pTbUids, i); + if (taosHashGet(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t)) == NULL) { + // not in dirty && expired in tdb => must be expired + taosArraySet(pTbUids, vIdx, pUid); + vIdx++; + } + } + + taosArrayPopTailBatch(pTbUids, pTbUids->size - vIdx); + +_out: + return ret; } static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 447a7e2d90..4aee98148b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1416,7 +1416,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) { + if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) { // no lock needs to secure the access of the version if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { // discard all the data when the stream task is suspended. @@ -1700,20 +1700,47 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + streamSetStatusNormal(pTask); + + SStreamTask** ppHTask = NULL; + if (pTask->historyTaskId.taskId != 0) { + keys[0] = pTask->historyTaskId.streamId; + keys[1] = pTask->historyTaskId.taskId; + + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + if (ppHTask == NULL || *ppHTask == NULL) { + tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", + pMeta->vgId, req.taskId); + } else { + tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); + streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + } + } { - streamSetStatusNormal(pTask); streamMetaSaveTask(pMeta, pTask); + if (ppHTask != NULL) { + streamMetaSaveTask(pMeta, *ppHTask); + } + if (streamMetaCommit(pMeta) < 0) { // persist to disk } } streamTaskStop(pTask); + if (ppHTask != NULL) { + streamTaskStop(*ppHTask); + } + tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); pMeta->closedTask += 1; + if (ppHTask != NULL) { + pMeta->closedTask += 1; + } + // possibly only handle the stream task. int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); bool allStopped = (pMeta->closedTask == numOfTasks); if (allStopped) { @@ -1752,6 +1779,7 @@ _end: taosWUnLockLatch(&pMeta->lock); if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d, restart all stream tasks", vgId); + tqStartStreamTasks(pTq); tqCheckAndRunStreamTaskAsync(pTq); } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 58c7686aeb..eb587b8be2 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -224,6 +224,35 @@ int32_t tqStopStreamTasks(STQ* pTq) { return 0; } +int32_t tqStartStreamTasks(STQ* pTq) { + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = TD_VID(pTq->pVnode); + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + + tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks); + + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + taosWLockLatch(&pMeta->lock); + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + + int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key)); + + int8_t status = (*pTask)->status.taskStatus; + if (status == TASK_STATUS__STOP) { + streamSetStatusNormal(*pTask); + } + } + + taosWUnLockLatch(&pMeta->lock); + return 0; +} + int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 2d58a10e51..4275e5626b 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -248,10 +248,14 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t tDecoderClear(&decoder); // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) int64_t key[2] = {task.streamId, task.taskId}; + + taosWLockLatch(&pTq->pStreamMeta->lock); if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { + taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } + taosWUnLockLatch(&pTq->pStreamMeta->lock); } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { // do nothing } diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index dc5e3649cc..7e5eb2c553 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -980,9 +980,7 @@ static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const TS writer->ctx->brinBlkArray = NULL; writer->ctx->tbHasOldData = false; goto _exit; - } - - for (; writer->ctx->brinBlkArrayIdx < TARRAY2_SIZE(writer->ctx->brinBlkArray); writer->ctx->brinBlkArrayIdx++) { + } else { const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx); if (brinBlk->minTbid.uid != writer->ctx->tbid->uid) { @@ -995,7 +993,6 @@ static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const TS writer->ctx->brinBlockIdx = 0; writer->ctx->brinBlkArrayIdx++; - break; } } @@ -1110,9 +1107,7 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA if (writer->ctx->brinBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->brinBlkArray)) { writer->ctx->brinBlkArray = NULL; break; - } - - for (; writer->ctx->brinBlkArrayIdx < TARRAY2_SIZE(writer->ctx->brinBlkArray); writer->ctx->brinBlkArrayIdx++) { + } else { const SBrinBlk *brinBlk = TARRAY2_GET_PTR(writer->ctx->brinBlkArray, writer->ctx->brinBlkArrayIdx); code = tsdbDataFileReadBrinBlock(writer->ctx->reader, brinBlk, writer->ctx->brinBlock); @@ -1120,7 +1115,6 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA writer->ctx->brinBlockIdx = 0; writer->ctx->brinBlkArrayIdx++; - break; } } @@ -1251,9 +1245,7 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom if (writer->ctx->tombBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->tombBlkArray)) { writer->ctx->hasOldTomb = false; break; - } - - for (; writer->ctx->tombBlkArrayIdx < TARRAY2_SIZE(writer->ctx->tombBlkArray); ++writer->ctx->tombBlkArrayIdx) { + } else { const STombBlk *tombBlk = TARRAY2_GET_PTR(writer->ctx->tombBlkArray, writer->ctx->tombBlkArrayIdx); code = tsdbDataFileReadTombBlock(writer->ctx->reader, tombBlk, writer->ctx->tombBlock); @@ -1261,7 +1253,6 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom writer->ctx->tombBlockIdx = 0; writer->ctx->tombBlkArrayIdx++; - break; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index a997c3eea5..f43bb52d05 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -174,11 +174,17 @@ int32_t save_fs(const TFileSetArray *arr, const char *fname) { // fset cJSON *ajson = cJSON_AddArrayToObject(json, "fset"); - if (!ajson) TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); + if (!ajson) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } const STFileSet *fset; TARRAY2_FOREACH(arr, fset) { cJSON *item = cJSON_CreateObject(); - if (!item) TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); + if (!item) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } cJSON_AddItemToArray(ajson, item); code = tsdbTFileSetToJson(fset, item); @@ -231,7 +237,8 @@ static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) { TSDB_CHECK_CODE(code, lino, _exit); } } else { - TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit); + code = TSDB_CODE_FILE_CORRUPTED; + TSDB_CHECK_CODE(code, lino, _exit); } _exit: @@ -312,7 +319,8 @@ static int32_t commit_edit(STFileSystem *fs) { int32_t code; int32_t lino; if ((code = taosRenameFile(current_t, current))) { - TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit); + code = TAOS_SYSTEM_ERROR(code); + TSDB_CHECK_CODE(code, lino, _exit); } code = apply_commit(fs); @@ -345,7 +353,8 @@ static int32_t abort_edit(STFileSystem *fs) { int32_t code; int32_t lino; if ((code = taosRemoveFile(fname))) { - TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit); + code = TAOS_SYSTEM_ERROR(code); + TSDB_CHECK_CODE(code, lino, _exit); } code = apply_abort(fs); @@ -398,7 +407,7 @@ static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry)); if (entry == NULL) return TSDB_CODE_OUT_OF_MEMORY; - strcpy(entry->fname, fname); + strncpy(entry->fname, fname, TSDB_FILENAME_LEN); uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket; @@ -873,7 +882,7 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) { STFileSet *fset1; fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray)); - if (fsetArr == NULL) return TSDB_CODE_OUT_OF_MEMORY; + if (fsetArr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; TARRAY2_INIT(fsetArr[0]); diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index 13ef7b3ba6..37c7e2ffc1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -46,7 +46,8 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl return code; } - TARRAY2_APPEND(lvl[0]->fobjArr, fobj); + code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj); + if (code) return code; } return 0; } @@ -185,7 +186,8 @@ static int32_t tsdbJsonToSttLvl(STsdb *pTsdb, const cJSON *json, SSttLvl **lvl) return code; } - TARRAY2_APPEND(lvl[0]->fobjArr, fobj); + code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj); + if (code) return code; } return 0; } @@ -263,7 +265,8 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset) { return code; } - TARRAY2_APPEND((*fset)->lvlArr, lvl); + code = TARRAY2_APPEND((*fset)->lvlArr, lvl); + if (code) return code; } } else { return TSDB_CODE_FILE_CORRUPTED; @@ -326,11 +329,12 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { STFileObj tfobj = {.f[0] = {.cid = op->of.cid}}, *tfobjp = &tfobj; STFileObj **fobjPtr = TARRAY2_SEARCH(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ); - tfobjp = (fobjPtr ? *fobjPtr : NULL); - - ASSERT(tfobjp); - - tfobjp->f[0] = op->nf; + if (fobjPtr) { + tfobjp = *fobjPtr; + tfobjp->f[0] = op->nf; + } else { + tsdbError("file not found, cid:%" PRId64, op->of.cid); + } } else { fset->farr[op->nf.type]->f[0] = op->nf; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index 585316469a..3d8964d41b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -42,8 +42,12 @@ static const struct { }; void remove_file(const char *fname) { - taosRemoveFile(fname); - tsdbInfo("file:%s is removed", fname); + int32_t code = taosRemoveFile(fname); + if (code) { + tsdbError("file:%s remove failed", fname); + } else { + tsdbInfo("file:%s is removed", fname); + } } static int32_t tfile_to_json(const STFile *file, cJSON *json) { diff --git a/source/dnode/vnode/src/tsdb/tsdbIter.c b/source/dnode/vnode/src/tsdb/tsdbIter.c index 9780cc6be6..447832108d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbIter.c @@ -356,7 +356,8 @@ static int32_t tsdbSttIterOpen(STsdbIter *iter) { } iter->sttData->sttBlkArrayIdx = 0; - tBlockDataCreate(iter->sttData->blockData); + code = tBlockDataCreate(iter->sttData->blockData); + if (code) return code; iter->sttData->blockDataIdx = 0; return tsdbSttIterNext(iter, NULL); @@ -381,7 +382,8 @@ static int32_t tsdbDataIterOpen(STsdbIter *iter) { iter->dataData->brinBlockIdx = 0; // SBlockData - tBlockDataCreate(iter->dataData->blockData); + code = tBlockDataCreate(iter->dataData->blockData); + if (code) return code; iter->dataData->blockDataIdx = 0; return tsdbDataIterNext(iter, NULL); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 8601248a69..ed4257b86d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -857,6 +857,9 @@ static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecor } else { break; } + + code = tsdbIterMergerNext(writer->ctx->tombIterMerger); + TSDB_CHECK_CODE(code, lino, _exit); } if (record->suid == INT64_MAX) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0b7f969ed7..22d5d40877 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -160,7 +160,7 @@ static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) { } { // find expired uids - tbUids = taosArrayInit(8, sizeof(int64_t)); + tbUids = taosArrayInit(8, sizeof(tb_uid_t)); if (tbUids == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); @@ -945,7 +945,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId; char detail[1000] = {0}; - sprintf(detail, "btime:%" PRId64 ", flags:%d, ttl:%d, type:%d", + sprintf(detail, "btime:%" PRId64 ", flags:%d, ttl:%d, type:%d", pCreateReq->btime, pCreateReq->flags, pCreateReq->ttl, pCreateReq->type); SName name = {0}; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d580b41093..43850ebfee 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -560,6 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId); } else { vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); + tqStartStreamTasks(pVnode->pTq); tqCheckAndRunStreamTaskAsync(pVnode->pTq); } } else { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 44de83b7ef..979085c397 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -760,12 +760,14 @@ int32_t ctgGetCachedStbNameFromSuid(SCatalog* pCtg, char* dbFName, uint64_t suid char *stb = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid)); if (NULL == stb) { ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName); + ctgReleaseDBCache(pCtg, dbCache); return TSDB_CODE_SUCCESS; } *stbName = taosStrdup(stb); taosHashRelease(dbCache->stbCache, stb); + ctgReleaseDBCache(pCtg, dbCache); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 991e2013f5..8b868ffde4 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -308,10 +308,11 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch if (retentions) { len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions); - taosMemoryFree(retentions); } } + taosMemoryFree(retentions); + (varDataLen(buf2)) = len; colDataSetVal(pCol2, 0, buf2, false); diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 38d82f05fb..e47cbb7eba 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -446,7 +446,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat taosThreadMutexInit(&inserter->mutex, NULL); if (NULL == inserter->pDataBlocks) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_OUT_OF_MEMORY; + goto _return; } inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols; diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index f2ed4ba618..8fc46e0239 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -151,14 +151,21 @@ static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin) static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { + freeOperatorParam(pChild, OP_GET_PARAM); return TSDB_CODE_OUT_OF_MEMORY; } if (pChild) { (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES); - if (NULL == *ppRes) { + if (NULL == (*ppRes)->pChildren) { + freeOperatorParam(pChild, OP_GET_PARAM); + freeOperatorParam(*ppRes, OP_GET_PARAM); + *ppRes = NULL; return TSDB_CODE_OUT_OF_MEMORY; } if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) { + freeOperatorParam(pChild, OP_GET_PARAM); + freeOperatorParam(*ppRes, OP_GET_PARAM); + *ppRes = NULL; return TSDB_CODE_OUT_OF_MEMORY; } } else { @@ -167,6 +174,8 @@ static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t down SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam)); if (NULL == pGc) { + freeOperatorParam(*ppRes, OP_GET_PARAM); + *ppRes = NULL; return TSDB_CODE_OUT_OF_MEMORY; } @@ -193,6 +202,7 @@ static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_ SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam)); if (NULL == pGc) { + freeOperatorParam(*ppRes, OP_NOTIFY_PARAM); return TSDB_CODE_OUT_OF_MEMORY; } @@ -248,6 +258,7 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam)); if (NULL == pExc) { + taosMemoryFreeClear(*ppRes); return TSDB_CODE_OUT_OF_MEMORY; } @@ -255,6 +266,7 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); if (NULL == pExc->pBatchs) { taosMemoryFree(pExc); + taosMemoryFreeClear(*ppRes); return TSDB_CODE_OUT_OF_MEMORY; } tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam); @@ -288,21 +300,36 @@ static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t d static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam* pChild0, SOperatorParam* pChild1) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { + freeOperatorParam(pChild0, OP_GET_PARAM); + freeOperatorParam(pChild1, OP_GET_PARAM); return TSDB_CODE_OUT_OF_MEMORY; } (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES); if (NULL == *ppRes) { + freeOperatorParam(pChild0, OP_GET_PARAM); + freeOperatorParam(pChild1, OP_GET_PARAM); + freeOperatorParam(*ppRes, OP_GET_PARAM); + *ppRes = NULL; return TSDB_CODE_OUT_OF_MEMORY; } if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) { + freeOperatorParam(pChild0, OP_GET_PARAM); + freeOperatorParam(pChild1, OP_GET_PARAM); + freeOperatorParam(*ppRes, OP_GET_PARAM); + *ppRes = NULL; return TSDB_CODE_OUT_OF_MEMORY; } if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) { + freeOperatorParam(pChild1, OP_GET_PARAM); + freeOperatorParam(*ppRes, OP_GET_PARAM); + *ppRes = NULL; return TSDB_CODE_OUT_OF_MEMORY; } SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam)); if (NULL == pJoin) { + freeOperatorParam(*ppRes, OP_GET_PARAM); + *ppRes = NULL; return TSDB_CODE_OUT_OF_MEMORY; } @@ -318,16 +345,28 @@ static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initPara static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { + freeOperatorParam(pChild0, OP_NOTIFY_PARAM); + freeOperatorParam(pChild1, OP_NOTIFY_PARAM); return TSDB_CODE_OUT_OF_MEMORY; } (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES); if (NULL == *ppRes) { + taosMemoryFreeClear(*ppRes); + freeOperatorParam(pChild0, OP_NOTIFY_PARAM); + freeOperatorParam(pChild1, OP_NOTIFY_PARAM); return TSDB_CODE_OUT_OF_MEMORY; } if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) { + freeOperatorParam(*ppRes, OP_NOTIFY_PARAM); + freeOperatorParam(pChild0, OP_NOTIFY_PARAM); + freeOperatorParam(pChild1, OP_NOTIFY_PARAM); + *ppRes = NULL; return TSDB_CODE_OUT_OF_MEMORY; } if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) { + freeOperatorParam(*ppRes, OP_NOTIFY_PARAM); + freeOperatorParam(pChild1, OP_NOTIFY_PARAM); + *ppRes = NULL; return TSDB_CODE_OUT_OF_MEMORY; } @@ -420,13 +459,34 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS if (TSDB_CODE_SUCCESS == code) { code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0); + pSrcParam0 = NULL; } if (TSDB_CODE_SUCCESS == code) { code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1); + pSrcParam1 = NULL; } if (TSDB_CODE_SUCCESS == code) { code = buildMergeJoinOperatorParam(ppParam, pSrcParam0 ? true : false, pGcParam0, pGcParam1); } + if (TSDB_CODE_SUCCESS != code) { + if (pSrcParam0) { + freeOperatorParam(pSrcParam0, OP_GET_PARAM); + } + if (pSrcParam1) { + freeOperatorParam(pSrcParam1, OP_GET_PARAM); + } + if (pGcParam0) { + freeOperatorParam(pGcParam0, OP_GET_PARAM); + } + if (pGcParam1) { + freeOperatorParam(pGcParam1, OP_GET_PARAM); + } + if (*ppParam) { + freeOperatorParam(*ppParam, OP_GET_PARAM); + *ppParam = NULL; + } + } + return code; } @@ -488,7 +548,7 @@ static void handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCt if (pPost->leftNeedCache) { uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); - if (--(*num) <= 0) { + if (num && --(*num) <= 0) { tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); notifySeqJoinTableCacheEnd(pOperator, pPost, true); } diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index d61034c26e..3cfd0ab582 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -277,7 +277,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; code = filterSetDataFromSlotId(pInfo->pEndCondInfo, ¶m2); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _return; } int32_t status2 = 0; @@ -331,10 +331,12 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p } } +_return: + colDataDestroy(ps); taosMemoryFree(ps); colDataDestroy(pe); taosMemoryFree(pe); - return TSDB_CODE_SUCCESS; + return code; } diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index ff771ccc8c..d20ad1d67c 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -223,6 +223,9 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S SGroupCacheFileInfo newFile = {0}; taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)); pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); + if (NULL == pTmp) { + return TSDB_CODE_OUT_OF_MEMORY; + } } if (pTmp->deleted) { @@ -287,7 +290,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC if (deleted) { qTrace("FileId:%d-%d-%d already be deleted, skip write", - pCtx->id, pGroup->vgId, pHead->basic.fileId); + pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId); int64_t blkId = pHead->basic.blkId; pHead = pHead->next; @@ -337,7 +340,9 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr return TSDB_CODE_OUT_OF_MEMORY; } pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId)); - + if (NULL == pBufInfo) { + return TSDB_CODE_OUT_OF_MEMORY; + } int32_t code = TSDB_CODE_SUCCESS; SGcBlkBufInfo* pWriteHead = NULL; @@ -378,6 +383,10 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr static FORCE_INLINE void chkRemoveVgroupCurrFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId) { SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId)); + if (NULL == pFileInfo) { + return; + } + if (0 == pFileInfo->groupNum) { removeGroupCacheFile(pFileInfo); @@ -711,6 +720,9 @@ static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int newFile.groupNum = 1; taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)); pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); + if (NULL == pTmp) { + return TSDB_CODE_OUT_OF_MEMORY; + } } else { pTmp->groupNum++; } @@ -786,6 +798,9 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* } *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid)); + if (NULL == *ppGrp) { + return TSDB_CODE_OUT_OF_MEMORY; + } initNewGroupData(pCtx, *ppGrp, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache); qError("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index d57f9f9f90..f382283d27 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -636,12 +636,14 @@ static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, S int32_t code = getValBufFromPages(pJoin->pRowBufs, getHJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow); if (code) { + taosMemoryFree(pRow); return code; } if (NULL == pGroup) { pRow->next = NULL; if (tSimpleHashPut(pJoin->pKeyHash, pTable->keyData, keyLen, &group, sizeof(group))) { + taosMemoryFree(pRow); return TSDB_CODE_OUT_OF_MEMORY; } } else { diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 1b286f9bdd..2348a3c97b 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -711,6 +711,11 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs } } } + + if (NULL == pJoinInfo->pLeft || NULL == pJoinInfo->pRight) { + setMergeJoinDone(pOperator); + return false; + } // only the timestamp match support for ordinary table SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e96ed87d3e..12ec63b4cb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2883,7 +2883,7 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) { } -static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI, STagScanInfo* pInfo) { +static int32_t tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI, STagScanInfo* pInfo) { int32_t code = 0; int32_t numOfTables = taosArrayGetSize(aUidTags); @@ -2894,9 +2894,15 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; SScalarParam output = {0}; - tagScanCreateResultData(&type, numOfTables, &output); + code = tagScanCreateResultData(&type, numOfTables, &output); + if (TSDB_CODE_SUCCESS != code) { + return code; + } - scalarCalculate(pTagCond, pBlockList, &output); + code = scalarCalculate(pTagCond, pBlockList, &output); + if (TSDB_CODE_SUCCESS != code) { + return code; + } bool* result = (bool*)output.columnData->pData; for (int32_t i = 0 ; i < numOfTables; ++i) { @@ -2911,7 +2917,7 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF blockDataDestroy(pResBlock); taosArrayDestroy(pBlockList); - + return TSDB_CODE_SUCCESS; } static void tagScanFillOneCellWithTag(SOperatorInfo* pOperator, const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) { @@ -3024,7 +3030,11 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { bool ignoreFilterIdx = true; if (pInfo->pTagCond != NULL) { ignoreFilterIdx = false; - tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI, pInfo); + int32_t code = tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI, pInfo); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } } else { ignoreFilterIdx = true; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 9bd0991435..a59a7bb1ea 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -669,8 +669,13 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* p->info.id.groupId = tupleGroupId; pInfo->groupId = tupleGroupId; } else { - pInfo->prefetchedTuple = pTupleHandle; - break; + if (p->info.rows == 0) { + appendOneRowToDataBlock(p, pTupleHandle); + p->info.id.groupId = pInfo->groupId = tupleGroupId; + } else { + pInfo->prefetchedTuple = pTupleHandle; + break; + } } } else { appendOneRowToDataBlock(p, pTupleHandle); @@ -715,14 +720,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData resetLimitInfoForNextGroup(&pInfo->limitInfo); } - bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); - // if limit is reached within a group, do not clear limiInfo otherwise the next block - // will be processed. - if (newgroup && limitReached) { - resetLimitInfoForNextGroup(&pInfo->limitInfo); - } + applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); - if (p->info.rows > 0 || limitReached) { + if (p->info.rows > 0) { break; } } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 4e0dff9d4f..44d39392a2 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -578,9 +578,8 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId); int64_t* tsList = (int64_t*)pCol->pData; TSKEY lastKey = tsList[pFillInfo->numOfRows - 1]; - numOfRes = taosTimeCountInterval(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding, - pFillInfo->interval.slidingUnit, pFillInfo->interval.precision); - numOfRes += 1; + numOfRes = taosTimeCountIntervalForFill(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding, + pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order); ASSERT(numOfRes >= numOfRows); } else { // reach the end of data if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) || @@ -588,9 +587,8 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma return 0; } - numOfRes = taosTimeCountInterval(ekey1, pFillInfo->currentKey, pFillInfo->interval.sliding, - pFillInfo->interval.slidingUnit, pFillInfo->interval.precision); - numOfRes += 1; + numOfRes = taosTimeCountIntervalForFill(ekey1, pFillInfo->currentKey, pFillInfo->interval.sliding, + pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order); } return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 6c4a780dfb..287c824540 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -904,6 +904,7 @@ static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdx } static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) { + int32_t code = TSDB_CODE_SUCCESS; int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz); blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); @@ -930,7 +931,13 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); SMultiwayMergeTreeInfo* pTree = NULL; - tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn); + code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn); + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(sup.aRowIdx); + taosMemoryFree(sup.aTs); + + return code; + } int32_t nRows = 0; int32_t nMergedRows = 0; bool mergeLimitReached = false; @@ -1054,7 +1061,14 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { tSimpleHashClear(mUidBlk); int64_t p = taosGetTimestampUs(); - sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); + code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); + if (code != TSDB_CODE_SUCCESS) { + tSimpleHashCleanup(mUidBlk); + taosArrayDestroy(aBlkSort); + taosArrayDestroy(aExtSrc); + return code; + } + int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 256123d62b..aad08fc337 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1786,7 +1786,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) && TSDB_DATA_TYPE_BOOL != colType && + if (!IS_INTEGER_TYPE(colType) && !IS_FLOAT_TYPE(colType) && TSDB_DATA_TYPE_BOOL != colType && !IS_TIMESTAMP_TYPE(colType)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } @@ -1815,6 +1815,8 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { uint8_t resType; if (IS_SIGNED_NUMERIC_TYPE(colType) || IS_TIMESTAMP_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType) { resType = TSDB_DATA_TYPE_BIGINT; + } else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) { + resType = TSDB_DATA_TYPE_UBIGINT; } else { resType = TSDB_DATA_TYPE_DOUBLE; } @@ -1839,10 +1841,6 @@ static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - if (TSDB_DATA_TYPE_VARBINARY == ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index bcbb3af950..108a641c08 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2714,16 +2714,20 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_BOOL: pDiffInfo->prev.i64 = *(bool*)pv ? 1 : 0; break; + case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_TINYINT: pDiffInfo->prev.i64 = *(int8_t*)pv; break; + case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_INT: pDiffInfo->prev.i64 = *(int32_t*)pv; break; + case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_SMALLINT: pDiffInfo->prev.i64 = *(int16_t*)pv; break; case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_BIGINT: pDiffInfo->prev.i64 = *(int64_t*)pv; break; @@ -2745,6 +2749,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, int64_t ts) { pDiffInfo->prevTs = ts; switch (type) { + case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_INT: { int32_t v = *(int32_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null @@ -2758,6 +2763,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, break; } case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_TINYINT: { int8_t v = *(int8_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null @@ -2769,6 +2775,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, pDiffInfo->prev.i64 = v; break; } + case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_SMALLINT: { int16_t v = *(int16_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null @@ -2781,6 +2788,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, break; } case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_BIGINT: { int64_t v = *(int64_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index bd459af9f5..c8eb7580ed 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -69,7 +69,7 @@ const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const void udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strncpy(processFuncName, udfName, sizeof(processFuncName)); + snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)); char startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; @@ -103,7 +103,7 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { if (udf->funcType == UDF_FUNC_TYPE_SCALAR) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strncpy(processFuncName, udfName, sizeof(processFuncName)); + snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)); } else if (udf->funcType == UDF_FUNC_TYPE_AGG) { udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 9d5a300c23..8008f4397e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2902,7 +2902,7 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) { taosCreateMD5Hash(buf, len); strncpy(pFunc->node.aliasName, buf, TSDB_COL_NAME_LEN - 1); len = snprintf(buf, sizeof(buf) - 1, "%s(%s)", pSrcFunc->functionName, pCol->colName); - taosCreateMD5Hash(buf, len); + // note: userAlias could be truncated here strncpy(pFunc->node.userAlias, buf, TSDB_COL_NAME_LEN - 1); } } else { @@ -2910,7 +2910,7 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) { taosCreateMD5Hash(buf, len); strncpy(pFunc->node.aliasName, buf, TSDB_COL_NAME_LEN - 1); len = snprintf(buf, sizeof(buf) - 1, "%s(%s)", pSrcFunc->functionName, pExpr->userAlias); - taosCreateMD5Hash(buf, len); + // note: userAlias could be truncated here strncpy(pFunc->node.userAlias, buf, TSDB_COL_NAME_LEN - 1); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index d460c8074d..96d253494d 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -46,8 +46,8 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol, bool isPartit pCol->colType = COLUMN_TYPE_TBNAME; SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 0); if (pVal) { - strcpy(pCol->tableName, pVal->literal); - strcpy(pCol->tableAlias, pVal->literal); + snprintf(pCol->tableName, sizeof(pCol->tableName), "%s", pVal->literal); + snprintf(pCol->tableAlias, sizeof(pCol->tableAlias), "%s", pVal->literal); } break; case FUNCTION_TYPE_WSTART: @@ -531,6 +531,9 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = nodesListStrictAppend(pJoin->node.pChildren, (SNode*)pLeft); } + if (TSDB_CODE_SUCCESS != code) { + pLeft = NULL; + } } SLogicNode* pRight = NULL; @@ -584,7 +587,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect } } - if (NULL == pJoin->node.pTargets) { + if (NULL == pJoin->node.pTargets && NULL != pLeft) { pJoin->node.pTargets = nodesCloneList(pLeft->pTargets); if (NULL == pJoin->node.pTargets) { code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 39becca781..916cc6e9ee 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -391,8 +391,8 @@ static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; if (streamTaskShouldStop(&pTask->status)) { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); - qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); + int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); return; } @@ -409,17 +409,22 @@ static void doRetryDispatchData(void* param, void* tmrId) { streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } } else { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); - qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); + int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); } } else { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); } } void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { - qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration); - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); + qWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration); + if (pTask->launchTaskTimer != NULL) { + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); + } else { + pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); + } } int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, @@ -540,8 +545,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", - pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + + qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", + pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); break; } @@ -995,8 +1002,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data", - id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d", + id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 635915aeb6..5bc21286d7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -666,6 +666,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { + // pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader + // In this case, we try not to start fill-history task anymore. if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 4b86b9713c..743d87e938 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -235,7 +235,13 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str); streamTaskLaunchScanHistory(pTask); } else { - qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); + if (pTask->info.fillHistory == 1) { + qDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); + pTask->status.taskStatus = TASK_STATUS__DROPPING; + ASSERT(pTask->historyTaskId.taskId == 0); + } else { + qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); + } } // when current stream task is ready, check the related fill history task. @@ -579,19 +585,17 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // todo fix the bug: 2. race condition // an fill history task needs to be started. int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { - int32_t tId = pTask->historyTaskId.taskId; - if (tId == 0) { + SStreamMeta* pMeta = pTask->pMeta; + int32_t hTaskId = pTask->historyTaskId.taskId; + if (hTaskId == 0) { return TSDB_CODE_SUCCESS; } ASSERT(pTask->status.downstreamReady == 1); qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, - pTask->historyTaskId.streamId, tId); + pTask->historyTaskId.streamId, hTaskId); - SStreamMeta* pMeta = pTask->pMeta; - int32_t hTaskId = pTask->historyTaskId.taskId; - - int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId}; + int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId}; // Set the execute conditions, including the query time window and the version range SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); @@ -610,11 +614,12 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { // todo failed to create timer taosMemoryFree(pInfo); } else { - atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active + int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active + ASSERT(ref == 1); qDebug("s-task:%s set timer active flag", pTask->id.idStr); } } else { // timer exists - ASSERT(pTask->status.timerActive > 0); + ASSERT(pTask->status.timerActive == 1); qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); } @@ -918,6 +923,13 @@ void streamTaskHalt(SStreamTask* pTask) { return; } + // wait for checkpoint completed + while(pTask->status.taskStatus == TASK_STATUS__CK) { + qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr, + streamGetTaskStatusStr(TASK_STATUS__CK)); + taosMsleep(1000); + } + // upgrade to halt status if (status == TASK_STATUS__PAUSE) { qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1ff266ad8f..edecfcb2bc 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2562,7 +2562,11 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex); } - SSyncLogReplMgr oldLogReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0}; + SSyncLogReplMgr* oldLogReplMgrs = NULL; + int64_t length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA); + oldLogReplMgrs = taosMemoryMalloc(length); + if (NULL == oldLogReplMgrs) return -1; + memset(oldLogReplMgrs, 0, length); for(int i = 0; i < oldtotalReplicaNum; i++){ oldLogReplMgrs[i] = *(ths->logReplMgrs[i]); @@ -2643,6 +2647,8 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum } } + taosMemoryFree(oldLogReplMgrs); + return 0; } diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index a48bae002e..7a62b38b16 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -2200,10 +2200,15 @@ int tdbBtcDelete(SBTC *pBtc) { tdbOsFree(pCell); if (pPage->nOverflow > 0) { - tdbDebug("tdb/btc-delete: btree balance after update cell, pPage/nOverflow: %p/%d.", pPage, - pPage->nOverflow); + tdbDebug("tdb/btc-delete: btree balance after update cell, pPage/nOverflow/pgno: %p/%d/%" PRIu32 ".", pPage, + pPage->nOverflow, TDB_PAGE_PGNO(pPage)); - pBtc->iPage = iPage; + tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn); + while (--pBtc->iPage != iPage) { + tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pgStack[pBtc->iPage], pBtc->pTxn); + } + + // pBtc->iPage = iPage; pBtc->pPage = pPage; ret = tdbBtreeBalance(pBtc); if (ret < 0) { diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index 262f3d27e6..5b8c407e95 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -295,7 +295,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) } // 1. pPage == NULL - // 2. pPage && pPage->isLocal == 0 && !TDB_TXN_IS_WRITE(pTxn) + // 2. pPage && !pPage->isLocal == 0 && !TDB_TXN_IS_WRITE(pTxn) pPageH = pPage; pPage = NULL; diff --git a/tests/docs-examples-test/python.sh b/tests/docs-examples-test/python.sh index 2a44ee7552..5de7261e02 100644 --- a/tests/docs-examples-test/python.sh +++ b/tests/docs-examples-test/python.sh @@ -86,7 +86,7 @@ pip3 install kafka-python python3 kafka_example_consumer.py # 21 -pip3 install taos-ws-py +pip3 install taos-ws-py==0.2.6 python3 conn_websocket_pandas.py # 22 diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index e8aabef8e4..cd37462cde 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -59,6 +59,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh index 206f99ff3d..94928e74ae 100755 --- a/tests/parallel_test/run_case.sh +++ b/tests/parallel_test/run_case.sh @@ -81,6 +81,11 @@ pip3 list|grep taospy pip3 uninstall taospy -y pip3 install --default-timeout=120 taospy==2.7.10 +#define taos-ws-py 0.2.8 +pip3 list|grep taos-ws-py +pip3 uninstall taos-ws-py -y +pip3 install --default-timeout=120 taos-ws-py==0.2.8 + $TIMEOUT_CMD $cmd RET=$? echo "cmd exit code: $RET" diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index cd873ca0f2..c4fc1ce654 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -668,7 +668,7 @@ class TDDnodes: self.testCluster = False self.valgrind = 0 self.asan = False - self.killValgrind = 1 + self.killValgrind = 0 def init(self, path, remoteIP = ""): binPath = self.dnodes[0].getPath() + "/../../../" @@ -775,9 +775,41 @@ class TDDnodes: tdLog.info("execute finished") return + def killProcesser(self, processerName): + if platform.system().lower() == 'windows': + killCmd = ("wmic process where name=\"%s.exe\" call terminate > NUL 2>&1" % processerName) + psCmd = ("wmic process where name=\"%s.exe\" | findstr \"%s.exe\"" % (processerName, processerName)) + else: + killCmd = ( + "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1" + % processerName + ) + psCmd = ("ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % processerName) + + processID = "" + + try: + processID = subprocess.check_output(psCmd, shell=True) + while processID: + os.system(killCmd) + time.sleep(1) + try: + processID = subprocess.check_output(psCmd, shell=True) + except Exception as err: + processID = "" + tdLog.debug('**** kill pid warn: {err}') + except Exception as err: + processID = "" + tdLog.debug(f'**** find pid warn: {err}') + + + def stopAll(self): tdLog.info("stop all dnodes, asan:%d" % self.asan) - distro_id = distro.id() + if platform.system().lower() != 'windows': + distro_id = distro.id() + else: + distro_id = "not alpine" if self.asan and distro_id != "alpine": tdLog.info("execute script: %s" % self.stopDnodesPath) os.system(self.stopDnodesPath) @@ -792,7 +824,6 @@ class TDDnodes: if (distro_id == "alpine"): - print(distro_id) psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip() while(processID): @@ -803,36 +834,9 @@ class TDDnodes: processID = subprocess.check_output( psCmd, shell=True).decode("utf-8").strip() elif platform.system().lower() == 'windows': - psCmd = "for /f %a in ('wmic process where \"name='taosd.exe'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip() - while(processID): - print(f"pid of taosd.exe:{processID}") - killCmd = "kill -9 %s > nul 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8").strip() - - psCmd = "for /f %a in ('wmic process where \"name='tmq_sim.exe'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip() - while(processID): - print(f"pid of tmq_sim.exe:{processID}") - killCmd = "kill -9 %s > nul 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8").strip() - - psCmd = "for /f %a in ('wmic process where \"name='taosBenchmark.exe'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)" - processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip() - while(processID): - print(f"pid of taosBenchmark.exe:{processID}") - killCmd = "kill -9 %s > nul 2>&1" % processID - os.system(killCmd) - time.sleep(1) - processID = subprocess.check_output( - psCmd, shell=True).decode("utf-8").strip() - + self.killProcesser("taosd") + self.killProcesser("tmq_sim") + self.killProcesser("taosBenchmark") else: psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip() @@ -849,7 +853,6 @@ class TDDnodes: time.sleep(1) processID = subprocess.check_output( psCmd, shell=True).decode("utf-8").strip() - if self.killValgrind == 1: psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip() diff --git a/tests/script/tsim/query/multires_func.sim b/tests/script/tsim/query/multires_func.sim new file mode 100644 index 0000000000..34aadffe2e --- /dev/null +++ b/tests/script/tsim/query/multires_func.sim @@ -0,0 +1,20 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql create database test +sql use test +sql CREATE TABLE `tb` (`ts` TIMESTAMP, `c0` INT, `c1` FLOAT, `c2` BINARY(10)) + + +sql insert into tb values("2022-05-15 00:01:08.000", 1, 1.0, "abc") +sql insert into tb values("2022-05-16 00:01:08.000", 2, 2.0, "bcd") +sql insert into tb values("2022-05-17 00:01:08.000", 3, 3.0, "cde") + + +#sleep 10000000 +system taos -P7100 -s 'source tsim/query/t/multires_func.sql' | grep -v 'Query OK' | grep -v 'Client Version' > /tmp/multires_func.result +system echo ----------------------diff start----------------------- +system git diff --exit-code --color tsim/query/r/multires_func.result /tmp/multires_func.result +system echo ----------------------diff succeed----------------------- diff --git a/tests/script/tsim/query/r/multires_func.result b/tests/script/tsim/query/r/multires_func.result new file mode 100644 index 0000000000..c221b00b97 --- /dev/null +++ b/tests/script/tsim/query/r/multires_func.result @@ -0,0 +1,31 @@ +Copyright (c) 2022 by TDengine, all rights reserved. + +taos> source tsim/query/t/multires_func.sql +taos> use test; +Database changed. + +taos> select count(*) from tb\G; +*************************** 1.row *************************** +count(*): 3 + +taos> select last(*) from tb\G; +*************************** 1.row *************************** +ts: 2022-05-17 00:01:08.000 +c0: 3 +c1: 3.0000000 +c2: cde + +taos> select last_row(*) from tb\G; +*************************** 1.row *************************** +ts: 2022-05-17 00:01:08.000 +c0: 3 +c1: 3.0000000 +c2: cde + +taos> select first(*) from tb\G; +*************************** 1.row *************************** +ts: 2022-05-15 00:01:08.000 +c0: 1 +c1: 1.0000000 +c2: abc + diff --git a/tests/script/tsim/query/t/multires_func.sql b/tests/script/tsim/query/t/multires_func.sql new file mode 100644 index 0000000000..6a191233b9 --- /dev/null +++ b/tests/script/tsim/query/t/multires_func.sql @@ -0,0 +1,5 @@ +use test; +select count(*) from tb\G; +select last(*) from tb\G; +select last_row(*) from tb\G; +select first(*) from tb\G; diff --git a/tests/system-test/0-others/splitVGroup.py b/tests/system-test/0-others/splitVGroup.py index 9fd00892e4..ed80505ce2 100644 --- a/tests/system-test/0-others/splitVGroup.py +++ b/tests/system-test/0-others/splitVGroup.py @@ -198,7 +198,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/0-others/timeRangeWise.py b/tests/system-test/0-others/timeRangeWise.py index a7dc18aa82..5ef5aa4a75 100644 --- a/tests/system-test/0-others/timeRangeWise.py +++ b/tests/system-test/0-others/timeRangeWise.py @@ -210,7 +210,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/0-others/ttl.py b/tests/system-test/0-others/ttl.py new file mode 100644 index 0000000000..7de309c135 --- /dev/null +++ b/tests/system-test/0-others/ttl.py @@ -0,0 +1,37 @@ +import time +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + + +class TDTestCase: + updatecfgDict = {'ttlUnit': 1, "ttlPushInterval": 1, "ttlChangeOnWrite": 0} + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + self.ttl = 5 + self.dbname = "test" + + def check_ttl_result(self): + tdSql.execute(f'create database {self.dbname}') + tdSql.execute(f'create table {self.dbname}.t1(ts timestamp, c1 int)') + tdSql.execute(f'create table {self.dbname}.t2(ts timestamp, c1 int) ttl {self.ttl}') + tdSql.query(f'show {self.dbname}.tables') + tdSql.checkRows(2) + tdSql.execute(f'flush database {self.dbname}') + time.sleep(self.ttl + 2) + tdSql.query(f'show {self.dbname}.tables') + tdSql.checkRows(1) + + def run(self): + self.check_ttl_result() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/0-others/ttlChangeOnWrite.py b/tests/system-test/0-others/ttlChangeOnWrite.py new file mode 100644 index 0000000000..16c6585e07 --- /dev/null +++ b/tests/system-test/0-others/ttlChangeOnWrite.py @@ -0,0 +1,63 @@ +import time +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + + +class TDTestCase: + updatecfgDict = {'ttlUnit': 1, "ttlPushInterval": 3, "ttlChangeOnWrite": 1, "trimVDbIntervalSec": 360, + "ttlFlushThreshold": 100, "ttlBatchDropNum": 10} + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + self.ttl = 5 + self.tables = 100 + self.dbname = "test" + + def check_batch_drop_num(self): + tdSql.execute(f'create database {self.dbname} vgroups 1') + tdSql.execute(f'use {self.dbname}') + tdSql.execute(f'create table stb(ts timestamp, c1 int) tags(t1 int)') + for i in range(self.tables): + tdSql.execute(f'create table t{i} using stb tags({i}) ttl {self.ttl}') + + tdSql.execute(f'flush database {self.dbname}') + time.sleep(self.ttl + self.updatecfgDict['ttlPushInterval'] + 1) + tdSql.query('show tables') + tdSql.checkRows(90) + + def check_ttl_result(self): + tdSql.execute(f'drop database if exists {self.dbname}') + tdSql.execute(f'create database {self.dbname}') + tdSql.execute(f'create table {self.dbname}.t1(ts timestamp, c1 int)') + tdSql.execute(f'create table {self.dbname}.t2(ts timestamp, c1 int) ttl {self.ttl}') + tdSql.query(f'show {self.dbname}.tables') + tdSql.checkRows(2) + + tdSql.execute(f'flush database {self.dbname}') + time.sleep(self.ttl - 1) + tdSql.execute(f'insert into {self.dbname}.t2 values(now, 1)'); + + tdSql.execute(f'flush database {self.dbname}') + time.sleep(self.ttl - 1) + tdSql.query(f'show {self.dbname}.tables') + tdSql.checkRows(2) + + tdSql.execute(f'flush database {self.dbname}') + time.sleep(self.ttl * 2) + tdSql.query(f'show {self.dbname}.tables') + tdSql.checkRows(1) + + def run(self): + self.check_batch_drop_num() + self.check_ttl_result() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/1-insert/precisionUS.py b/tests/system-test/1-insert/precisionUS.py index 1b41d66010..d634149297 100644 --- a/tests/system-test/1-insert/precisionUS.py +++ b/tests/system-test/1-insert/precisionUS.py @@ -220,7 +220,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/1-insert/table_param_ttl.py b/tests/system-test/1-insert/table_param_ttl.py index 6cc978a76c..f36a49a1d7 100644 --- a/tests/system-test/1-insert/table_param_ttl.py +++ b/tests/system-test/1-insert/table_param_ttl.py @@ -35,6 +35,7 @@ class TDTestCase: tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}') tdSql.query(f'show db.tables') tdSql.checkRows(self.tbnum) + tdSql.execute(f'flush database db') sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1) tdSql.query(f'show db.tables') tdSql.checkRows(0) @@ -42,6 +43,7 @@ class TDTestCase: tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}') for i in range(int(self.tbnum/2)): tdSql.execute(f'alter table db.{self.ntbname}_{i} ttl {self.modify_ttl}') + tdSql.execute(f'flush database db') sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'] + 1) tdSql.query(f'show db.tables') tdSql.checkRows(self.tbnum - int(self.tbnum/2)) @@ -54,6 +56,7 @@ class TDTestCase: tdSql.execute(f'create table db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param}') tdSql.query(f'show db.tables') tdSql.checkRows(self.tbnum) + tdSql.execute(f'flush database db') sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1) tdSql.query(f'show db.tables') tdSql.checkRows(0) @@ -63,6 +66,7 @@ class TDTestCase: tdSql.checkRows(self.tbnum) for i in range(int(self.tbnum/2)): tdSql.execute(f'alter table db.{self.stbname}_{i} ttl {self.modify_ttl}') + tdSql.execute(f'flush database db') sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'] + 1) tdSql.query(f'show db.tables') tdSql.checkRows(self.tbnum - int(self.tbnum/2)) @@ -75,6 +79,7 @@ class TDTestCase: tdSql.execute(f'insert into db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param} values(now,1)') tdSql.query(f'show db.tables') tdSql.checkRows(self.tbnum) + tdSql.execute(f'flush database db') sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1) tdSql.query(f'show db.tables') tdSql.checkRows(0) diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py index c6f233eefa..0a2f750f93 100644 --- a/tests/system-test/2-query/diff.py +++ b/tests/system-test/2-query/diff.py @@ -16,6 +16,9 @@ class TDTestCase: self.perfix = 'dev' self.tables = 10 + def check_result(self): + for i in range(self.rowNum): + tdSql.checkData(i, 0, 1); def run(self): tdSql.prepare() @@ -179,11 +182,6 @@ class TDTestCase: tdSql.error(f"select diff(col8) from {dbname}.stb_1") tdSql.error(f"select diff(col9) from {dbname}.stb") tdSql.error(f"select diff(col9) from {dbname}.stb_1") - tdSql.error(f"select diff(col11) from {dbname}.stb_1") - tdSql.error(f"select diff(col12) from {dbname}.stb_1") - tdSql.error(f"select diff(col13) from {dbname}.stb_1") - tdSql.error(f"select diff(col14) from {dbname}.stb_1") - tdSql.error(f"select diff(col14) from {dbname}.stb_1") tdSql.error(f"select diff(col1,col1,col1) from {dbname}.stb_1") tdSql.error(f"select diff(col1,1,col1) from {dbname}.stb_1") tdSql.error(f"select diff(col1,col1,col) from {dbname}.stb_1") @@ -217,6 +215,22 @@ class TDTestCase: tdSql.query(f"select diff(col6) from {dbname}.stb_1") tdSql.checkRows(10) + tdSql.query(f"select diff(col11) from {dbname}.stb_1") + tdSql.checkRows(10) + self.check_result() + + tdSql.query(f"select diff(col12) from {dbname}.stb_1") + tdSql.checkRows(10) + self.check_result() + + tdSql.query(f"select diff(col13) from {dbname}.stb_1") + tdSql.checkRows(10) + self.check_result() + + tdSql.query(f"select diff(col14) from {dbname}.stb_1") + tdSql.checkRows(10) + self.check_result() + tdSql.execute(f'''create table {dbname}.stb1(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''') tdSql.execute(f"create table {dbname}.stb1_1 using {dbname}.stb tags('shanghai')") diff --git a/tests/system-test/2-query/fill_with_group.py b/tests/system-test/2-query/fill_with_group.py index 393102c8ed..b442647ff4 100644 --- a/tests/system-test/2-query/fill_with_group.py +++ b/tests/system-test/2-query/fill_with_group.py @@ -137,6 +137,12 @@ class TDTestCase: sql = "select _wstart, _wend, count(ts), sum(c1) from meters where ts > '2018-11-25 00:00:00.000' and ts < '2018-11-26 00:00:00.00' interval(1d) fill(NULL) order by _wstart desc" tdSql.query(sql) tdSql.checkRows(1) + sql = "select _wstart, count(*) from meters where ts > '2018-08-20 00:00:00.000' and ts < '2018-09-30 00:00:00.000' interval(9d) fill(NULL) order by _wstart desc;" + tdSql.query(sql) + tdSql.checkRows(6) + sql = "select _wstart, count(*) from meters where ts > '2018-08-20 00:00:00.000' and ts < '2018-09-30 00:00:00.000' interval(9d) fill(NULL) order by _wstart;" + tdSql.query(sql) + tdSql.checkRows(6) def run(self): self.prepareTestEnv() diff --git a/tests/system-test/2-query/interval_limit_opt.py b/tests/system-test/2-query/interval_limit_opt.py index fef6e9facd..851138fed3 100644 --- a/tests/system-test/2-query/interval_limit_opt.py +++ b/tests/system-test/2-query/interval_limit_opt.py @@ -251,10 +251,19 @@ class TDTestCase: tdSql.checkData(2, 4, 9) tdSql.checkData(3, 4, 9) + def test_partition_by_limit_no_agg(self): + sql_template = 'select t1 from meters partition by t1 limit %d' + + for i in range(1, 5000, 1000): + tdSql.query(sql_template % i) + tdSql.checkRows(5 * i) + + def run(self): self.prepareTestEnv() self.test_interval_limit_offset() self.test_interval_partition_by_slimit_limit() + self.test_partition_by_limit_no_agg() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/orderBy.py b/tests/system-test/2-query/orderBy.py index fed1651b3a..fa447cbca4 100644 --- a/tests/system-test/2-query/orderBy.py +++ b/tests/system-test/2-query/orderBy.py @@ -220,7 +220,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/2-query/smaBasic.py b/tests/system-test/2-query/smaBasic.py index c221a70605..a82d190e2b 100644 --- a/tests/system-test/2-query/smaBasic.py +++ b/tests/system-test/2-query/smaBasic.py @@ -269,7 +269,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 7f972d857e..087e5a7c62 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -233,7 +233,7 @@ class TMQCom: #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): rowsBatched = 0 - sql += " %s%d values "%(stbName,i) + sql += " %s.%s%d values "%(dbName, stbName, i) for j in range(rowsPerTbl): sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) rowsBatched += 1 @@ -241,7 +241,7 @@ class TMQCom: tsql.execute(sql) rowsBatched = 0 if j < rowsPerTbl - 1: - sql = "insert into %s%d values " %(stbName,i) + sql = "insert into %s.%s%d values " %(dbName, stbName,i) else: sql = "insert into " #end sql @@ -263,7 +263,7 @@ class TMQCom: #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): rowsBatched = 0 - sql += " %s%d values "%(ctbPrefix,i) + sql += " %s.%s%d values "%(dbName, ctbPrefix,i) for j in range(rowsPerTbl): if (j % 2 == 0): sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j) @@ -274,7 +274,7 @@ class TMQCom: tsql.execute(sql) rowsBatched = 0 if j < rowsPerTbl - 1: - sql = "insert into %s%d values " %(ctbPrefix,i) + sql = "insert into %s.%s%d values " %(dbName, ctbPrefix, i) else: sql = "insert into " #end sql @@ -296,7 +296,7 @@ class TMQCom: #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): rowsBatched = 0 - sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx) + sql += " %s.%s%d values "%(dbName, ctbPrefix, i+ctbStartIdx) for j in range(rowsPerTbl): if (j % 2 == 0): sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j) @@ -307,7 +307,7 @@ class TMQCom: tsql.execute(sql) rowsBatched = 0 if j < rowsPerTbl - 1: - sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx) + sql = "insert into %s.%s%d values " %(dbName, ctbPrefix, i+ctbStartIdx) else: sql = "insert into " #end sql diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 1c50e5bbbe..e0dc426004 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -35,6 +35,7 @@ from util.taosadapter import * import taos import taosrest +import taosws def checkRunTimeError(): import win32gui @@ -105,12 +106,13 @@ if __name__ == "__main__": queryPolicy = 1 createDnodeNums = 1 restful = False + websocket = False replicaVar = 1 asan = False independentMnode = True previousCluster = False - opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RD:n:i:aP', [ - 'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful','adaptercfgupdate','replicaVar','independentMnode','previous']) + opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWD:n:i:aP', [ + 'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode','previous']) for key, value in opts: if key in ['-h', '--help']: tdLog.printNoPrefix( @@ -131,6 +133,7 @@ if __name__ == "__main__": tdLog.printNoPrefix('-Q set queryPolicy in one dnode') tdLog.printNoPrefix('-C create Dnode Numbers in one cluster') tdLog.printNoPrefix('-R restful realization form') + tdLog.printNoPrefix('-W websocket connection') tdLog.printNoPrefix('-D taosadapter update cfg dict ') tdLog.printNoPrefix('-n the number of replicas') tdLog.printNoPrefix('-i independentMnode Mnode') @@ -177,7 +180,7 @@ if __name__ == "__main__": sys.exit(0) if key in ['-k', '--killValgrind']: - killValgrind = 0 + killValgrind = 1 if key in ['-e', '--execCmd']: try: @@ -203,6 +206,9 @@ if __name__ == "__main__": if key in ['-R', '--restful']: restful = True + + if key in ['-W', '--websocket']: + websocket = True if key in ['-a', '--asan']: asan = True @@ -224,7 +230,7 @@ if __name__ == "__main__": # do exeCmd command # if not execCmd == "": - if restful: + if restful or websocket: tAdapter.init(deployPath) else: tdDnodes.init(deployPath) @@ -263,7 +269,7 @@ if __name__ == "__main__": if valgrind: time.sleep(2) - if restful: + if restful or websocket: toBeKilled = "taosadapter" # killCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1" % toBeKilled @@ -358,7 +364,7 @@ if __name__ == "__main__": tdDnodes.deploy(1,updateCfgDict) tdDnodes.start(1) tdCases.logSql(logSql) - if restful: + if restful or websocket: tAdapter.deploy(adapter_cfg_dict) tAdapter.start() @@ -366,6 +372,8 @@ if __name__ == "__main__": queryPolicy=int(queryPolicy) if restful: conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") else: conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) @@ -395,14 +403,16 @@ if __name__ == "__main__": tdDnodes.starttaosd(dnode.index) tdCases.logSql(logSql) - if restful: + if restful or websocket: tAdapter.deploy(adapter_cfg_dict) tAdapter.start() - if not restful: - conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) - else: + if restful: conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") + else: + conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) # tdLog.info(tdDnodes.getSimCfgPath(),host) if createDnodeNums == 1: createDnodeNums=dnodeNums @@ -419,6 +429,8 @@ if __name__ == "__main__": queryPolicy=int(queryPolicy) if restful: conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") else: conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) @@ -438,10 +450,12 @@ if __name__ == "__main__": if ucase is not None and hasattr(ucase, 'noConn') and ucase.noConn == True: conn = None else: - if not restful: - conn = taos.connect(host="%s"%(host), config=tdDnodes.sim.getCfgDir()) + if restful: + conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") else: - conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) if testCluster: tdLog.info("Procedures for testing cluster") @@ -451,10 +465,12 @@ if __name__ == "__main__": tdCases.runOneCluster(fileName) else: tdLog.info("Procedures for testing self-deployment") - if not restful: - conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) + if restful: + conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") else: - conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) if fileName == "all": tdCases.runAllWindows(conn) @@ -470,10 +486,12 @@ if __name__ == "__main__": tdDnodes.stopAll() tdDnodes.start(1) time.sleep(1) - if not restful: - conn = taos.connect( host, config=tdDnodes.getSimCfgPath()) - else: + if restful: conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") + else: + conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) tdLog.info("Procedures for tdengine deployed in %s" % (host)) tdLog.info("query test after taosd restart") tdCases.runOneWindows(conn, sp[0] + "_" + "restart.py", replicaVar) @@ -505,7 +523,7 @@ if __name__ == "__main__": except: pass - if restful: + if restful or websocket: tAdapter.init(deployPath, masterIp) tAdapter.stop(force_kill=True) @@ -515,16 +533,18 @@ if __name__ == "__main__": tdDnodes.start(1) tdCases.logSql(logSql) - if restful: + if restful or websocket: tAdapter.deploy(adapter_cfg_dict) tAdapter.start() if queryPolicy != 1: queryPolicy=int(queryPolicy) - if not restful: - conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) - else: + if restful: conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") + else: + conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) # tdSql.init(conn.cursor()) # tdSql.execute("create qnode on dnode 1") # tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy) @@ -567,15 +587,17 @@ if __name__ == "__main__": tdDnodes.starttaosd(dnode.index) tdCases.logSql(logSql) - if restful: + if restful or websocket: tAdapter.deploy(adapter_cfg_dict) tAdapter.start() # create taos connect - if not restful: - conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) + if restful: + conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") else: - conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) print(tdDnodes.getSimCfgPath(),host) if createDnodeNums == 1: createDnodeNums=dnodeNums @@ -595,8 +617,10 @@ if __name__ == "__main__": queryPolicy=int(queryPolicy) if restful: conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") else: - conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) + conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) cursor = conn.cursor() cursor.execute("create qnode on dnode 1") @@ -621,10 +645,12 @@ if __name__ == "__main__": tdCases.runOneCluster(fileName) else: tdLog.info("Procedures for testing self-deployment") - if not restful: - conn = taos.connect(host,config=tdDnodes.getSimCfgPath()) + if restful: + conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") else: - conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) if fileName == "all": tdCases.runAllLinux(conn) @@ -641,10 +667,12 @@ if __name__ == "__main__": tdDnodes.stopAll() tdDnodes.start(1) time.sleep(1) - if not restful: - conn = taos.connect( host, config=tdDnodes.getSimCfgPath()) - else: + if restful: conn = taosrest.connect(url=f"http://{host}:6041",timezone="utc") + elif websocket: + conn = taosws.connect(f"taosws://root:taosdata@{host}:6041") + else: + conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath()) tdLog.info("Procedures for tdengine deployed in %s" % (host)) tdLog.info("query test after taosd restart") tdCases.runOneLinux(conn, sp[0] + "_" + "restart.py", replicaVar) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 72cf3cd1cc..a7f79fc9db 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -28,12 +28,12 @@ static void shellRecordCommandToHistory(char *command); static int32_t shellRunCommand(char *command, bool recordHistory); static void shellRunSingleCommandImp(char *command); static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision); -static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres); +static int64_t shellDumpResultToFile(const char *fname, TAOS_RES *tres); static void shellPrintNChar(const char *str, int32_t length, int32_t width); static void shellPrintGeometry(const unsigned char *str, int32_t length, int32_t width); -static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql); -static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql); -static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql); +static int64_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql); +static int64_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql); +static int64_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql); static void shellReadHistory(); static void shellWriteHistory(); static void shellPrintError(TAOS_RES *tres, int64_t st); @@ -238,14 +238,14 @@ void shellRunSingleCommandImp(char *command) { if (pFields != NULL) { // select and show kinds of commands int32_t error_no = 0; - int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command); + int64_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command); if (numOfRows < 0) return; et = taosGetTimestampUs(); if (error_no == 0) { - printf("Query OK, %d row(s) in set (%.6fs)\r\n", numOfRows, (et - st) / 1E6); + printf("Query OK, %"PRId64 " row(s) in set (%.6fs)\r\n", numOfRows, (et - st) / 1E6); } else { - printf("Query interrupted (%s), %d row(s) in set (%.6fs)\r\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6); + printf("Query interrupted (%s), %"PRId64 " row(s) in set (%.6fs)\r\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6); } taos_free_result(pSql); } else { @@ -430,7 +430,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i } } -int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) { +int64_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) { char fullname[PATH_MAX] = {0}; if (taosExpandDir(fname, fullname, PATH_MAX) != 0) { tstrncpy(fullname, fname, PATH_MAX); @@ -459,7 +459,7 @@ int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) { } taosFprintfFile(pFile, "\r\n"); - int32_t numOfRows = 0; + int64_t numOfRows = 0; do { int32_t *length = taos_fetch_lengths(tres); for (int32_t i = 0; i < num_fields; i++) { @@ -702,7 +702,7 @@ bool shellIsShowQuery(const char *sql) { return false; } -int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { +int64_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; @@ -726,11 +726,11 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; } - int32_t numOfRows = 0; + int64_t numOfRows = 0; int32_t showMore = 1; do { if (numOfRows < resShowMaxNum) { - printf("*************************** %d.row ***************************\r\n", numOfRows + 1); + printf("*************************** %"PRId64".row ***************************\r\n", numOfRows + 1); int32_t *length = taos_fetch_lengths(tres); @@ -856,7 +856,7 @@ void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields) { putchar('\n'); } -int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { +int64_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; @@ -879,7 +879,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; } - int32_t numOfRows = 0; + int64_t numOfRows = 0; int32_t showMore = 1; do { @@ -915,8 +915,8 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { return numOfRows; } -int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) { - int32_t numOfRows = 0; +int64_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) { + int64_t numOfRows = 0; if (fname != NULL) { numOfRows = shellDumpResultToFile(fname, tres); } else if (vertical) { diff --git a/utils/test/c/varbinary_test.c b/utils/test/c/varbinary_test.c index e29b94ad1f..1d432ffbba 100644 --- a/utils/test/c/varbinary_test.c +++ b/utils/test/c/varbinary_test.c @@ -157,10 +157,6 @@ void varbinary_sql_test() { taos_free_result(pRes); // string function test, not support - pRes = taos_query(taos, "select length(c2) from stb"); - ASSERT(taos_errno(pRes) != 0); - taos_free_result(pRes); - pRes = taos_query(taos, "select ltrim(c2) from stb"); ASSERT(taos_errno(pRes) != 0); taos_free_result(pRes); @@ -190,7 +186,7 @@ void varbinary_sql_test() { ASSERT(taos_errno(pRes) != 0); taos_free_result(pRes); - // support first/last/last_row/count/hyperloglog/sample/tail/mode + // support first/last/last_row/count/hyperloglog/sample/tail/mode/length pRes = taos_query(taos, "select first(c2) from stb"); ASSERT(taos_errno(pRes) == 0); taos_free_result(pRes); @@ -207,6 +203,10 @@ void varbinary_sql_test() { ASSERT(taos_errno(pRes) == 0); taos_free_result(pRes); + pRes = taos_query(taos, "select length(c2) from stb where c2 = '\\x7F8290'"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + pRes = taos_query(taos, "select cast(t2 as varbinary(16)) from stb order by ts"); while ((row = taos_fetch_row(pRes)) != NULL) { int32_t* length = taos_fetch_lengths(pRes);