diff --git a/cmake/version.inc b/cmake/version.inc index 52d62fca65..aa8a4b6463 100644 --- a/cmake/version.inc +++ b/cmake/version.inc @@ -4,7 +4,7 @@ PROJECT(TDengine) IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "2.0.3.0") + SET(TD_VER_NUMBER "2.0.4.0") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md b/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md index 3c7f6987c7..e0acaee137 100644 --- a/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md +++ b/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md @@ -1019,5 +1019,5 @@ SELECT AVG(current),MAX(current),LEASTSQUARES(current, start_val, step_val), PER - 表名最大长度为193,每行数据最大长度16k个字符 - 列名最大长度为65,最多允许1024列,最少需要2列,第一列必须是时间戳 - 标签最多允许128个,可以0个,标签总长度不超过16k个字符 -- SQL语句最大长度65480个字符,但可通过系统配置参数maxSQLLength修改,最长可配置为8M +- SQL语句最大长度65480个字符,但可通过系统配置参数maxSQLLength修改,最长可配置为1M - 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制 diff --git a/documentation20/webdocs/markdowndocs/architecture-ch.md b/documentation20/webdocs/markdowndocs/architecture-ch.md index a279875649..460e655f5f 100644 --- a/documentation20/webdocs/markdowndocs/architecture-ch.md +++ b/documentation20/webdocs/markdowndocs/architecture-ch.md @@ -67,7 +67,7 @@ TDengine 分布式架构的逻辑结构图如下:
图 1 TDengine架构示意图
一个完整的 TDengine 系统是运行在一到多个物理节点上的,逻辑上,它包含数据节点(dnode)、TDengine客户端(taosc)以及应用(app)。系统中存在一到多个数据节点,这些数据节点组成一个集群(cluster)。应用通过taosc的API与TDengine集群进行互动。下面对每个逻辑单元进行简要介绍。 -**物理节点(pnode):** pnode是一独立运行、拥有自己的计算、存储和网络能力的计算机,可以是安装有OS的物理机、虚拟机或容器。物理节点由其配置的 FQDN(Fully Qualified Domain Name)来标识。 +**物理节点(pnode):** pnode是一独立运行、拥有自己的计算、存储和网络能力的计算机,可以是安装有OS的物理机、虚拟机或容器。物理节点由其配置的 FQDN(Fully Qualified Domain Name)来标识。TDengine完全依赖FQDN来进行网络通讯,如果不了解FQDN,请看博文《[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)》。 **数据节点(dnode):** dnode 是 TDengine 服务器侧执行代码 taosd 在物理节点上的一个运行实例,一个工作的系统必须有至少一个数据节点。dnode包含零到多个逻辑的虚拟节点(VNODE),零或者至多一个逻辑的管理节点(mnode)。dnode在系统中的唯一标识由实例的End Point (EP )决定。EP是dnode所在物理节点的FQDN (Fully Qualified Domain Name)和系统所配置的网络端口号(Port)的组合。通过配置不同的端口,一个物理节点(一台物理机、虚拟机或容器)可以运行多个实例,或有多个数据节点。 diff --git a/documentation20/webdocs/markdowndocs/cluster-ch.md b/documentation20/webdocs/markdowndocs/cluster-ch.md index 193de729ba..c36819e5c7 100644 --- a/documentation20/webdocs/markdowndocs/cluster-ch.md +++ b/documentation20/webdocs/markdowndocs/cluster-ch.md @@ -2,13 +2,13 @@ 多个TDengine服务器,也就是多个taosd的运行实例可以组成一个集群,以保证TDengine的高可靠运行,并提供水平扩展能力。要了解TDengine 2.0的集群管理,需要对集群的基本概念有所了解,请看TDengine 2.0整体架构一章。而且在安装集群之前,先请按照[《立即开始》](https://www.taosdata.com/cn/getting-started20/)一章安装并体验单节点功能。 -集群的每个数据节点是由End Point来唯一标识的,End Point是由FQDN(Fully Qualified Domain Name)外加Port组成,比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname,可通过Linux命令`hostname -f`获取,FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)。端口是这个数据节点对外服务的端口号,缺省是6030,但可以通过taos.cfg里配置参数serverPort进行修改。一个物理节点可能配置了多个hostname, TDengine会自动获取第一个,但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问,可以将参数fqdn设置为本节点的IP地址。 +集群的每个数据节点是由End Point来唯一标识的,End Point是由FQDN(Fully Qualified Domain Name)外加Port组成,比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname,可通过Linux命令`hostname -f`获取(如何配置FQDN,请参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html))。端口是这个数据节点对外服务的端口号,缺省是6030,但可以通过taos.cfg里配置参数serverPort进行修改。一个物理节点可能配置了多个hostname, TDengine会自动获取第一个,但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问,可以将参数fqdn设置为本节点的IP地址。 TDengine的集群管理极其简单,除添加和删除节点需要人工干预之外,其他全部是自动完成,最大程度的降低了运维的工作量。本章对集群管理的操作做详细的描述。 ## 准备工作 -**第零步**:规划集群所有物理节点的FQDN,将规划好的FQDN分别添加到每个物理节点的/etc/hostname;修改每个物理节点的/etc/hosts,将所有集群物理节点的IP与FQDN的对应添加好【如部署了DNS,请联系网络管理员在DNS上做好相关配置】; +**第零步**:如果没有部署DNS服务,请规划集群所有物理节点的FQDN,然后按照《[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)》里的步骤,将所有集群物理节点的IP与FQDN的对应关系添加好。 **第一步**:如果搭建集群的物理节点中,存有之前的测试数据、装过1.X的版本,或者装过其他版本的TDengine,请先将其删除,并清空所有数据,具体步骤请参考博客[《TDengine多种安装包的安装和卸载》](https://www.taosdata.com/blog/2019/08/09/566.html ) **注意1:**因为FQDN的信息会写进文件,如果之前没有配置或者更改FQDN,且启动了TDengine。请一定在确保数据无用或者备份的前提下,清理一下之前的数据(rm -rf /var/lib/taos/); @@ -22,7 +22,8 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预 1. 每个物理节点上执行命令`hostname -f`,查看和确认所有节点的hostname是不相同的(应用驱动所在节点无需做此项检查); 2. 每个物理节点上执行`ping host`, 其中host是其他物理节点的hostname, 看能否ping通其它物理节点; 如果不能ping通,需要检查网络设置, 或/etc/hosts文件(Windows系统默认路径为C:\Windows\system32\drivers\etc\hosts),或DNS的配置。如果无法ping通,是无法组成集群的; -3. 每个数据节点的End Point就是输出的hostname外加端口号,比如h1.taosdata.com:6030 +3. 从应用运行的物理节点,ping taosd运行的数据节点,如果无法ping通,应用是无法连接taosd的,请检查应用所在物理节点的DNS设置或hosts文件; +4. 每个数据节点的End Point就是输出的hostname外加端口号,比如h1.taosdata.com:6030 **第五步**:修改TDengine的配置文件(所有节点的文件/etc/taos/taos.cfg都需要修改)。假设准备启动的第一个数据节点End Point为 h1.taosdata.com:6030, 其与集群配置相关参数如下: @@ -30,8 +31,8 @@ TDengine的集群管理极其简单,除添加和删除节点需要人工干预 // firstEp 是每个数据节点首次启动后连接的第一个数据节点 firstEp h1.taosdata.com:6030 -// 配置本数据节点的FQDN,如果本机只有一个hostname, 无需配置 -fqdn h1.taosdata.com +// 必须配置为本数据节点的FQDN,如果本机只有一个hostname, 可注释掉本配置 +fqdn h1.taosdata.com // 配置本数据节点的端口号,缺省是6030 serverPort 6030 @@ -40,7 +41,7 @@ serverPort 6030 arbitrator ha.taosdata.com:6042 ``` -一定要修改的参数是firstEp和fqdn, 其他参数可不做任何修改,除非你很清楚为什么要修改。 +一定要修改的参数是firstEp和fqdn。在每个数据节点,firstEp需全部配置成一样,**但fqdn一定要配置成其所在数据节点的值**。其他参数可不做任何修改,除非你很清楚为什么要修改。 **加入到集群中的数据节点dnode,涉及集群相关的下表11项参数必须完全相同,否则不能成功加入到集群中。** @@ -114,6 +115,8 @@ taos> ## 数据节点管理 +上面已经介绍如何从零开始搭建集群。集群组建完后,还可以随时添加新的数据节点进行扩容,或删除数据节点,并检查集群当前状态。 + ### 添加数据节点 执行CLI程序taos, 使用root账号登录进系统, 执行: diff --git a/documentation20/webdocs/markdowndocs/insert-ch.md b/documentation20/webdocs/markdowndocs/insert-ch.md index 5e4532dfd0..a84b577622 100644 --- a/documentation20/webdocs/markdowndocs/insert-ch.md +++ b/documentation20/webdocs/markdowndocs/insert-ch.md @@ -25,6 +25,7 @@ INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6, - 要提高写入效率,需要批量写入。一批写入的记录条数越多,插入效率就越高。但一条记录不能超过16K,一条SQL语句总长度不能超过64K(可通过参数maxSQLLength配置,最大可配置为8M)。 - TDengine支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开20个以上的线程同时写。但线程数达到一定数量后,无法再提高,甚至还会下降,因为线程切频繁切换,带来额外开销。 - 对同一张表,如果新插入记录的时间戳已经存在,新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。如果应用自动生成记录,很有可能生成的时间戳是一样的,这样,成功插入的记录条数会小于应用插入的记录条数。 +- 写入的数据的时间戳必须大于当前时间减去配置参数keep的时间。如果keep配置为3650天,那么无法写入比3650天还老的数据。写入数据的时间戳也不能大于当前时间加配置参数days。如果days配置为2,那么无法写入比当前时间还晚2天的数据。 ## Prometheus直接写入 [Prometheus](https://www.prometheus.io/)作为Cloud Native Computing Fundation毕业的项目,在性能监控以及K8S性能监控领域有着非常广泛的应用。TDengine提供一个小工具[Bailongma](https://github.com/taosdata/Bailongma),只需在Prometheus做简单配置,无需任何代码,就可将Prometheus采集的数据直接写入TDengine,并按规则在TDengine自动创建库和相关表项。博文[用Docker容器快速搭建一个Devops监控Demo](https://www.taosdata.com/blog/2020/02/03/1189.html)即是采用bailongma将Prometheus和Telegraf的数据写入TDengine中的示例,可以参考。 diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index edeca86c17..be38a7af71 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -326,7 +326,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI } else if (functionId == TSDB_FUNC_LAST_ROW) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; - *interBytes = dataBytes + sizeof(SLastrowInfo); + *interBytes = dataBytes; } else { return TSDB_CODE_TSC_INVALID_SQL; } @@ -711,13 +711,16 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY en if (pCtx->aOutputBuf == NULL) { return BLK_DATA_ALL_NEEDED; } - - SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); - if (pInfo->hasResult != DATA_SET_FLAG) { - return BLK_DATA_ALL_NEEDED; - } else { // data in current block is not earlier than current result - return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; - } + + return BLK_DATA_ALL_NEEDED; + // TODO pCtx->aOutputBuf is the previous windowRes output buffer, not current unloaded block. so the following filter + // is invalid +// SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); +// if (pInfo->hasResult != DATA_SET_FLAG) { +// return BLK_DATA_ALL_NEEDED; +// } else { // data in current block is not earlier than current result +// return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; +// } } static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { @@ -730,12 +733,16 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end return BLK_DATA_ALL_NEEDED; } - SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); - if (pInfo->hasResult != DATA_SET_FLAG) { - return BLK_DATA_ALL_NEEDED; - } else { - return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; - } + return BLK_DATA_ALL_NEEDED; + // TODO pCtx->aOutputBuf is the previous windowRes output buffer, not current unloaded block. so the following filter + // is invalid + +// SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); +// if (pInfo->hasResult != DATA_SET_FLAG) { +// return BLK_DATA_ALL_NEEDED; +// } else { +// return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; +// } } ////////////////////////////////////////////////////////////////////////////////////////////// @@ -1836,8 +1843,10 @@ static void last_row_function(SQLFunctionCtx *pCtx) { pInfo1->hasResult = DATA_SET_FLAG; DO_UPDATE_TAG_COLUMNS(pCtx, pInfo1->ts); + } else { + DO_UPDATE_TAG_COLUMNS(pCtx, pCtx->ptsList[pCtx->size - 1]); } - + SET_VAL(pCtx, pCtx->size, 1); } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 620e8ea57a..c4ca6793ff 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -546,6 +546,10 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { pSql->cmd.numOfParams = 0; pSql->cmd.batchSize = 0; + uint64_t handle = (uint64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); + T_REF_INC(pSql->pTscObj); + int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { // wait for the callback function to post the semaphore @@ -574,7 +578,7 @@ int taos_stmt_close(TAOS_STMT* stmt) { free(normal->sql); } - tscFreeSqlObj(pStmt->pSql); + taos_free_result(pStmt->pSql); free(pStmt); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4364659ca6..ce86353c3e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -351,7 +351,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { case TSDB_SQL_DESCRIBE_TABLE: { SStrToken* pToken = &pInfo->pDCLInfo->a[0]; const char* msg1 = "invalid table name"; - const char* msg2 = "table name is too long"; + const char* msg2 = "table name too long"; if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); @@ -410,7 +410,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { const char* msg3 = "name too long"; pCmd->command = pInfo->type; - // tDCLSQL* pDCL = pInfo->pDCLInfo; SUserInfo* pUser = &pInfo->pDCLInfo->user; SStrToken* pName = &pUser->user; @@ -773,7 +772,7 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql) { const char* msg1 = "name too long"; - const char* msg2 = "current database name is invalid"; + const char* msg2 = "current database or database name invalid"; SSqlCmd* pCmd = &pSql->cmd; int32_t code = TSDB_CODE_SUCCESS; @@ -4874,6 +4873,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) { {"cDebugFlag", 10}, {"httpDebugFlag", 13}, {"qDebugflag", 10}, {"sdbDebugFlag", 12}, {"uDebugFlag", 10}, {"tsdbDebugFlag", 13}, {"sDebugflag", 10}, {"rpcDebugFlag", 12}, {"dDebugFlag", 10}, {"mqttDebugFlag", 13}, {"wDebugFlag", 10}, {"tmrDebugFlag", 12}, + {"cqDebugFlag", 11}, }; SStrToken* pOptionToken = &pOptions->a[1]; @@ -5288,9 +5288,12 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) { int32_t tagLength = 0; - size_t size = taosArrayGetSize(pQueryInfo->exprList); - + +//todo is 0?? + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); + for (int32_t i = 0; i < size; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_TAGPRJ || pExpr->functionId == TSDB_FUNC_TAG) { @@ -5302,8 +5305,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) { } } - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); + SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); for (int32_t i = 0; i < size; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); @@ -5311,7 +5313,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) { !(pExpr->functionId == TSDB_FUNC_PRJ && TSDB_COL_IS_UD_COL(pExpr->colInfo.flag))) { SSchema* pColSchema = &pSchema[pExpr->colInfo.colIndex]; getResultDataInfo(pColSchema->type, pColSchema->bytes, pExpr->functionId, (int32_t)pExpr->param[0].i64Key, &pExpr->resType, - &pExpr->resBytes, &pExpr->interBytes, tagLength, true); + &pExpr->resBytes, &pExpr->interBytes, tagLength, isSTable); } } } @@ -5322,7 +5324,7 @@ static int32_t doUpdateSqlFunctionForColPrj(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < size; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - if (pExpr->functionId == TSDB_FUNC_PRJ && (!TSDB_COL_IS_UD_COL(pExpr->colInfo.flag))) { + if (pExpr->functionId == TSDB_FUNC_PRJ && (!TSDB_COL_IS_UD_COL(pExpr->colInfo.flag) && (pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX))) { bool qualifiedCol = false; for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) { SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, j); @@ -5420,13 +5422,6 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd) int16_t numOfSelectivity = 0; int16_t numOfAggregation = 0; - // todo is 0?? - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - if (!isSTable) { - return TSDB_CODE_SUCCESS; - } - size_t numOfExprs = taosArrayGetSize(pQueryInfo->exprList); for (int32_t i = 0; i < numOfExprs; ++i) { SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, i); @@ -6303,6 +6298,11 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { } if (pQueryInfo->interval.interval > 0 && pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') { + bool initialWindows = TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER); + if (initialWindows) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); + } + int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey); // number of result is not greater than 10,000,000 if ((timeRange == 0) || (timeRange / pQueryInfo->interval.interval) > MAX_INTERVAL_TIME_WINDOW) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index c6e9cbafd7..ddbee3106d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -188,8 +188,8 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { if (tscShouldFreeHeartBeat(pHB)) { tscDebug("%p free HB object and release connection", pHB); - tscFreeSqlObj(pHB); - tscCloseTscObj(pObj); + pObj->pHb = 0; + taos_free_result(pHB); } else { int32_t code = tscProcessSql(pHB); if (code != TSDB_CODE_SUCCESS) { @@ -1959,6 +1959,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { return 0; } +// TODO multithread problem static void createHBObj(STscObj* pObj) { if (pObj->pHb != NULL) { return; @@ -1987,10 +1988,13 @@ static void createHBObj(STscObj* pObj) { pSql->pTscObj = pObj; pSql->signature = pSql; pObj->pHb = pSql; - T_REF_INC(pObj); tscAddSubqueryInfo(&pObj->pHb->cmd); + int64_t ad = (int64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &ad, sizeof(int64_t), &pSql, sizeof(int64_t), 2 * 60 * 1000); + T_REF_INC(pObj); + tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); } @@ -2017,8 +2021,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pObj->connId = htonl(pConnect->connId); createHBObj(pObj); - -// taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); + taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 69bc69cd4a..8b79b0278b 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -236,13 +236,21 @@ TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t us return taos_connect(ipBuf, userBuf, passBuf, dbBuf, port); } +static void asyncConnCallback(void *param, TAOS_RES *tres, int code) { + SSqlObj *pSql = (SSqlObj *) tres; + assert(pSql != NULL); + + pSql->fetchFp(pSql->param, tres, code); +} + TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { - SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, fp, param, taos); + SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, taos); if (pSql == NULL) { return NULL; } + pSql->fetchFp = fp; pSql->res.code = tscProcessSql(pSql); tscDebug("%p DB async connection is opening", taos); return taos; @@ -255,33 +263,17 @@ void taos_close(TAOS *taos) { return; } - if (pObj->pHb != NULL) { - if (pObj->pHb->pRpcCtx != NULL) { // wait for rsp from dnode - rpcCancelRequest(pObj->pHb->pRpcCtx); + SSqlObj* pHb = pObj->pHb; + if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { + if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode + rpcCancelRequest(pHb->pRpcCtx); + pHb->pRpcCtx = NULL; } - tscSetFreeHeatBeat(pObj); - tscFreeSqlObj(pObj->pHb); + tscDebug("%p, HB is freed", pHb); + taos_free_result(pHb); } - // free all sqlObjs created by using this connect before free the STscObj -// while(1) { -// pthread_mutex_lock(&pObj->mutex); -// void* p = pObj->sqlList; -// pthread_mutex_unlock(&pObj->mutex); -// -// if (p == NULL) { -// break; -// } -// -// tscDebug("%p waiting for sqlObj to be freed, %p", pObj, p); -// taosMsleep(100); -// -// // todo fix me!! two threads call taos_free_result will cause problem. -// tscDebug("%p free :%p", pObj, p); -// taos_free_result(p); -// } - int32_t ref = T_REF_DEC(pObj); assert(ref >= 0); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 4a1f4d9d87..fed9caebbe 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -515,6 +515,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { return; } + uint64_t handle = (uint64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); + T_REF_INC(pSql->pTscObj); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); @@ -568,6 +572,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p pStream->pSql = pSql; pSql->pStream = pStream; pSql->param = pStream; + pSql->maxRetry = TSDB_MAX_REPLICA; pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); if (pSql->sqlstr == NULL) { @@ -575,6 +580,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscFreeSqlObj(pSql); return NULL; } + strtolower(pSql->sqlstr, sqlstr); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); @@ -615,10 +621,9 @@ void taos_close_stream(TAOS_STREAM *handle) { tscDebug("%p stream:%p is closed", pSql, pStream); // notify CQ to release the pStream object pStream->fp(pStream->param, NULL, NULL); - - tscFreeSqlObj(pSql); pStream->pSql = NULL; + taos_free_result(pSql); taosTFree(pStream); } } diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 76bce19668..760c5f5a51 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -152,6 +152,10 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* goto fail; } + uint64_t handle = (uint64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); + T_REF_INC(pSql->pTscObj); + code = tsParseSql(pSql, false); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { tsem_wait(&pSub->sem); @@ -173,7 +177,11 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* fail: tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code)); if (pSql != NULL) { - tscFreeSqlObj(pSql); + if (pSql->self != NULL) { + taos_free_result(pSql); + } else { + tscFreeSqlObj(pSql); + } pSql = NULL; } if (pSub != NULL) { @@ -494,6 +502,10 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { } } + if (pSub->pSql != NULL) { + taos_free_result(pSub->pSql); + } + tscFreeSqlObj(pSub->pSql); taosArrayDestroy(pSub->progress); tsem_destroy(&pSub->sem); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index f9bb180810..e66b361191 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -278,7 +278,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { tscDebug("%p subIndex: %d, no need to launch query, ignore it", pSql, i); tscDestroyJoinSupporter(pSupporter); - tscFreeSqlObj(pPrevSub); + taos_free_result(pPrevSub); pSql->pSubs[i] = NULL; continue; @@ -1383,7 +1383,7 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState taosTFree(pSupport->localBuffer); taosTFree(pSupport); - tscFreeSqlObj(pSub); + taos_free_result(pSub); } free(pState); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 38fb63f18e..20c3bc2cb6 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1743,8 +1743,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm } pNew->pTscObj = pSql->pTscObj; - T_REF_INC(pNew->pTscObj); - pNew->signature = pNew; SSqlCmd* pCmd = &pNew->cmd; @@ -1777,7 +1775,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL); T_REF_INC(pNew->pTscObj); - uint64_t p = (uint64_t) pNew; pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); return pNew; diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index b6a2146c13..d0f9cf4df6 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -175,6 +175,7 @@ extern int32_t rpcDebugFlag; extern int32_t odbcDebugFlag; extern int32_t qDebugFlag; extern int32_t wDebugFlag; +extern int32_t cqDebugFlag; extern int32_t debugFlag; #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index f03b0f85b4..59c1433c75 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -204,6 +204,7 @@ int32_t debugFlag = 0; int32_t sDebugFlag = 135; int32_t wDebugFlag = 135; int32_t tsdbDebugFlag = 131; +int32_t cqDebugFlag = 135; int32_t (*monitorStartSystemFp)() = NULL; void (*monitorStopSystemFp)() = NULL; @@ -223,12 +224,13 @@ void taosSetAllDebugFlag() { httpDebugFlag = debugFlag; mqttDebugFlag = debugFlag; monitorDebugFlag = debugFlag; + qDebugFlag = debugFlag; rpcDebugFlag = debugFlag; uDebugFlag = debugFlag; sDebugFlag = debugFlag; wDebugFlag = debugFlag; tsdbDebugFlag = debugFlag; - qDebugFlag = debugFlag; + cqDebugFlag = debugFlag; uInfo("all debug flag are set to %d", debugFlag); } } @@ -1220,6 +1222,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "cqDebugFlag"; + cfg.ptr = &cqDebugFlag; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG; + cfg.minValue = 0; + cfg.maxValue = 255; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "tscEnableRecordSql"; cfg.ptr = &tsTscEnableRecordSql; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/connector/go b/src/connector/go index 8c58c512b6..06ec30a0f1 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 +Subproject commit 06ec30a0f1762e8169bf6b9045c82bcaa52bcdf0 diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml index 0fed63fc80..36e2fa426b 100755 --- a/src/connector/jdbc/pom.xml +++ b/src/connector/jdbc/pom.xml @@ -1,101 +1,102 @@ - 4.0.0 - com.taosdata.jdbc - taos-jdbcdriver - 2.0.0 - jar - JDBCDriver - https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc - TDengine JDBC Driver - - - GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - https://github.com/taosdata/TDengine/blob/master/LICENSE - repo - - - - scm:git:git://github.com/taosdata/TDengine.git - scm:git:git@github.com:taosdata/TDengine.git + 4.0.0 + com.taosdata.jdbc + taos-jdbcdriver + 2.0.0 + jar + JDBCDriver https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc - HEAD - - - - taosdata - support@taosdata.com - https://www.taosdata.com/ - https://www.taosdata.com/ - - - - UTF-8 - 1.8 - 3.6.0 - 1.1.2 - 3.5 - - - - commons-logging - commons-logging - ${commons-logging.version} - - - * - * - - - - - junit - junit - 4.13 - test - - - - - - org.apache.maven.plugins - maven-assembly-plugin - 3.0.0 - - - src/main/assembly/assembly-jar.xml - - - - - make-assembly - package - - single - - - - - - org.apache.maven.plugins - maven-compiler-plugin - ${maven-compiler-plugin.version} - - UTF-8 - ${java.version} - ${java.version} - true - true - - - - org.apache.maven.plugins - maven-surefire-plugin - 2.12.4 - - true - - - - + + TDengine JDBC Driver + + + GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + https://github.com/taosdata/TDengine/blob/master/LICENSE + repo + + + + scm:git:git://github.com/taosdata/TDengine.git + scm:git:git@github.com:taosdata/TDengine.git + https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc + HEAD + + + + taosdata + support@taosdata.com + https://www.taosdata.com/ + https://www.taosdata.com/ + + + + UTF-8 + 1.8 + 3.6.0 + 1.1.2 + 3.5 + + + + commons-logging + commons-logging + ${commons-logging.version} + + + * + * + + + + + junit + junit + 4.13 + test + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.0.0 + + + src/main/assembly/assembly-jar.xml + + + + + make-assembly + package + + single + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven-compiler-plugin.version} + + UTF-8 + ${java.version} + ${java.version} + true + true + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + true + + + + diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java index d8df2fc0d3..ac0e4eb84a 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java @@ -53,66 +53,12 @@ public class TSDBConnection implements Connection { public TSDBConnection(Properties info, TSDBDatabaseMetaData meta) throws SQLException { this.dbMetaData = meta; - //load taos.cfg start - File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR)); - File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0]; - List endpoints = loadConfigEndpoints(cfgFile); - if (!endpoints.isEmpty()) { - info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]); - info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]); - } - //load taos.cfg end - connect(info.getProperty(TSDBDriver.PROPERTY_KEY_HOST), Integer.parseInt(info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "0")), info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME), info.getProperty(TSDBDriver.PROPERTY_KEY_USER), info.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD)); } - private List loadConfigEndpoints(File cfgFile) { - List endpoints = new ArrayList<>(); - try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) { - String line = null; - while ((line = reader.readLine()) != null) { - if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) { - endpoints.add(line.substring(line.indexOf('p') + 1).trim()); - } - if (endpoints.size() > 1) - break; - } - } catch (FileNotFoundException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - return endpoints; - } - - /** - * @param cfgDirPath - * @return return the config dir - **/ - private File loadConfigDir(String cfgDirPath) { - if (cfgDirPath == null) - return loadDefaultConfigDir(); - File cfgDir = new File(cfgDirPath); - if (!cfgDir.exists()) - return loadDefaultConfigDir(); - return cfgDir; - } - - /** - * @return search the default config dir, if the config dir is not exist will return null - */ - private File loadDefaultConfigDir() { - File cfgDir; - File cfgDir_linux = new File("/etc/taos"); - cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null; - File cfgDir_windows = new File("C:\\TDengine\\cfg"); - cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir; - return cfgDir; - } - private void connect(String host, int port, String dbName, String user, String password) throws SQLException { this.connector = new TSDBJNIConnector(); this.connector.connect(host, port, dbName, user, password); diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDatabaseMetaData.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDatabaseMetaData.java index 804e09c6b3..15f66fa202 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDatabaseMetaData.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDatabaseMetaData.java @@ -68,15 +68,15 @@ public class TSDBDatabaseMetaData implements java.sql.DatabaseMetaData { } public boolean nullsAreSortedLow() throws SQLException { - return false; + return !nullsAreSortedHigh(); } public boolean nullsAreSortedAtStart() throws SQLException { - return false; + return true; } public boolean nullsAreSortedAtEnd() throws SQLException { - return false; + return !nullsAreSortedAtStart(); } public String getDatabaseProductName() throws SQLException { diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java index e25bd64c73..bc649a31e1 100755 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBDriver.java @@ -14,24 +14,29 @@ *****************************************************************************/ package com.taosdata.jdbc; + +import java.io.*; + import java.sql.*; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import java.util.logging.Logger; /** * The Java SQL framework allows for multiple database drivers. Each driver * should supply a class that implements the Driver interface - * + * *

* The DriverManager will try to load as many drivers as it can find and then * for any given connection request, it will ask each driver in turn to try to * connect to the target URL. - * + * *

* It is strongly recommended that each Driver class should be small and stand * alone so that the Driver class can be loaded and queried without bringing in * vast quantities of supporting code. - * + * *

* When a Driver class is loaded, it should create an instance of itself and * register it with the DriverManager. This means that a user can load and @@ -39,38 +44,41 @@ import java.util.logging.Logger; */ public class TSDBDriver implements java.sql.Driver { - @Deprecated - private static final String URL_PREFIX1 = "jdbc:tsdb://"; - private static final String URL_PREFIX = "jdbc:taos://"; - /** - * Key used to retrieve the database value from the properties instance passed - * to the driver. - */ - public static final String PROPERTY_KEY_DBNAME = "dbname"; + @Deprecated + private static final String URL_PREFIX1 = "jdbc:TSDB://"; - /** - * Key used to retrieve the host value from the properties instance passed to - * the driver. - */ - public static final String PROPERTY_KEY_HOST = "host"; - /** - * Key used to retrieve the password value from the properties instance passed - * to the driver. - */ - public static final String PROPERTY_KEY_PASSWORD = "password"; + private static final String URL_PREFIX = "jdbc:TAOS://"; - /** - * Key used to retrieve the port number value from the properties instance - * passed to the driver. - */ - public static final String PROPERTY_KEY_PORT = "port"; + /** + * Key used to retrieve the database value from the properties instance passed + * to the driver. + */ + public static final String PROPERTY_KEY_DBNAME = "dbname"; + + /** + * Key used to retrieve the host value from the properties instance passed to + * the driver. + */ + public static final String PROPERTY_KEY_HOST = "host"; + /** + * Key used to retrieve the password value from the properties instance passed + * to the driver. + */ + public static final String PROPERTY_KEY_PASSWORD = "password"; + + /** + * Key used to retrieve the port number value from the properties instance + * passed to the driver. + */ + public static final String PROPERTY_KEY_PORT = "port"; + + /** + * Key used to retrieve the user value from the properties instance passed to + * the driver. + */ + public static final String PROPERTY_KEY_USER = "user"; - /** - * Key used to retrieve the user value from the properties instance passed to - * the driver. - */ - public static final String PROPERTY_KEY_USER = "user"; /** * Key for the configuration file directory of TSDB client in properties instance @@ -95,278 +103,320 @@ public class TSDBDriver implements java.sql.Driver { public static final String PROPERTY_KEY_PROTOCOL = "protocol"; - /** - * Index for port coming out of parseHostPortPair(). - */ - public final static int PORT_NUMBER_INDEX = 1; - /** - * Index for host coming out of parseHostPortPair(). - */ - public final static int HOST_NAME_INDEX = 0; + /** + * Index for port coming out of parseHostPortPair(). + */ + public final static int PORT_NUMBER_INDEX = 1; - private TSDBDatabaseMetaData dbMetaData = null; + /** + * Index for host coming out of parseHostPortPair(). + */ + public final static int HOST_NAME_INDEX = 0; - static { - try { - java.sql.DriverManager.registerDriver(new TSDBDriver()); - } catch (SQLException E) { - throw new RuntimeException(TSDBConstants.WrapErrMsg("can't register tdengine jdbc driver!")); - } - } + private TSDBDatabaseMetaData dbMetaData = null; - public Connection connect(String url, Properties info) throws SQLException { - if (url == null) { - throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!")); - } + static { + try { + java.sql.DriverManager.registerDriver(new TSDBDriver()); + } catch (SQLException E) { + throw new RuntimeException(TSDBConstants.WrapErrMsg("can't register tdengine jdbc driver!")); + } + } - Properties props = null; + private List loadConfigEndpoints(File cfgFile) { + List endpoints = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) { + String line = null; + while ((line = reader.readLine()) != null) { + if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) { + endpoints.add(line.substring(line.indexOf('p') + 1).trim()); + } + if (endpoints.size() > 1) + break; + } + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return endpoints; + } - if ((props = parseURL(url, info)) == null) { - return null; - } + /** + * @param cfgDirPath + * @return return the config dir + **/ + private File loadConfigDir(String cfgDirPath) { + if (cfgDirPath == null) + return loadDefaultConfigDir(); + File cfgDir = new File(cfgDirPath); + if (!cfgDir.exists()) + return loadDefaultConfigDir(); + return cfgDir; + } - try { - TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE), (String) props.get(PROPERTY_KEY_CHARSET), - (String) props.get(PROPERTY_KEY_TIME_ZONE)); - Connection newConn = new TSDBConnection(props, this.dbMetaData); - return newConn; - } catch (SQLWarning sqlWarning) { - sqlWarning.printStackTrace(); - Connection newConn = new TSDBConnection(props, this.dbMetaData); - return newConn; - } catch (SQLException sqlEx) { - throw sqlEx; - } catch (Exception ex) { - SQLException sqlEx = new SQLException("SQLException:" + ex.toString()); - sqlEx.initCause(ex); - throw sqlEx; - } - } + /** + * @return search the default config dir, if the config dir is not exist will return null + */ + private File loadDefaultConfigDir() { + File cfgDir; + File cfgDir_linux = new File("/etc/taos"); + cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null; + File cfgDir_windows = new File("C:\\TDengine\\cfg"); + cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir; + return cfgDir; + } - /** - * Parses hostPortPair in the form of [host][:port] into an array, with the - * element of index HOST_NAME_INDEX being the host (or null if not specified), - * and the element of index PORT_NUMBER_INDEX being the port (or null if not - * specified). - * - * @param hostPortPair - * host and port in form of of [host][:port] - * - * @return array containing host and port as Strings - * - * @throws SQLException - * if a parse error occurs - */ - protected static String[] parseHostPortPair(String hostPortPair) throws SQLException { - String[] splitValues = new String[2]; + public Connection connect(String url, Properties info) throws SQLException { + if (url == null) { + throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!")); + } - int portIndex = hostPortPair.indexOf(":"); + Properties props = null; + if ((props = parseURL(url, info)) == null) { + return null; + } - String hostname = null; + //load taos.cfg start + if (info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null && info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null){ + File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR)); + File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0]; + List endpoints = loadConfigEndpoints(cfgFile); + if (!endpoints.isEmpty()) { + info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]); + info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]); + } + } - if (portIndex != -1) { - if ((portIndex + 1) < hostPortPair.length()) { - String portAsString = hostPortPair.substring(portIndex + 1); - hostname = hostPortPair.substring(0, portIndex); + try { + TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE), (String) props.get(PROPERTY_KEY_CHARSET), + (String) props.get(PROPERTY_KEY_TIME_ZONE)); + Connection newConn = new TSDBConnection(props, this.dbMetaData); + return newConn; + } catch (SQLWarning sqlWarning) { + sqlWarning.printStackTrace(); + Connection newConn = new TSDBConnection(props, this.dbMetaData); + return newConn; + } catch (SQLException sqlEx) { + throw sqlEx; + } catch (Exception ex) { + SQLException sqlEx = new SQLException("SQLException:" + ex.toString()); + sqlEx.initCause(ex); + throw sqlEx; + } + } - splitValues[HOST_NAME_INDEX] = hostname; + /** + * Parses hostPortPair in the form of [host][:port] into an array, with the + * element of index HOST_NAME_INDEX being the host (or null if not specified), + * and the element of index PORT_NUMBER_INDEX being the port (or null if not + * specified). + * + * @param hostPortPair host and port in form of of [host][:port] + * @return array containing host and port as Strings + * @throws SQLException if a parse error occurs + */ + protected static String[] parseHostPortPair(String hostPortPair) throws SQLException { + String[] splitValues = new String[2]; - splitValues[PORT_NUMBER_INDEX] = portAsString; - } else { - throw new SQLException(TSDBConstants.WrapErrMsg("port is not proper!")); - } - } else { - splitValues[HOST_NAME_INDEX] = hostPortPair; - splitValues[PORT_NUMBER_INDEX] = null; - } + int portIndex = hostPortPair.indexOf(":"); - return splitValues; - } + String hostname = null; - public boolean acceptsURL(String url) throws SQLException { - return (url != null && url.length() > 0 && url.trim().length() > 0) && url.toLowerCase().startsWith(URL_PREFIX); - } + if (portIndex != -1) { + if ((portIndex + 1) < hostPortPair.length()) { + String portAsString = hostPortPair.substring(portIndex + 1); + hostname = hostPortPair.substring(0, portIndex); - public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { - if (info == null) { - info = new Properties(); - } + splitValues[HOST_NAME_INDEX] = hostname; - if ((url != null) && (url.startsWith(URL_PREFIX) || url.startsWith(URL_PREFIX1))) { - info = parseURL(url, info); - } + splitValues[PORT_NUMBER_INDEX] = portAsString; + } else { + throw new SQLException(TSDBConstants.WrapErrMsg("port is not proper!")); + } + } else { + splitValues[HOST_NAME_INDEX] = hostPortPair; + splitValues[PORT_NUMBER_INDEX] = null; + } - DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST)); - hostProp.required = true; + return splitValues; + } - DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT, - info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT)); - portProp.required = false; + public boolean acceptsURL(String url) throws SQLException { + return (url != null && url.length() > 0 && url.trim().length() > 0) && url.toLowerCase().startsWith(URL_PREFIX); + } - DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME)); - dbProp.required = false; - dbProp.description = "Database name"; + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + if (info == null) { + info = new Properties(); + } - DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER)); - userProp.required = true; + if ((url != null) && (url.startsWith(URL_PREFIX) || url.startsWith(URL_PREFIX1))) { + info = parseURL(url, info); + } - DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD, - info.getProperty(PROPERTY_KEY_PASSWORD)); - passwordProp.required = true; + DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST)); + hostProp.required = true; - DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5]; - propertyInfo[0] = hostProp; - propertyInfo[1] = portProp; - propertyInfo[2] = dbProp; - propertyInfo[3] = userProp; - propertyInfo[4] = passwordProp; + DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT, info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT)); + portProp.required = false; - return propertyInfo; - } + DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME)); + dbProp.required = false; + dbProp.description = "Database name"; - /** - * example: jdbc:TSDB://127.0.0.1:0/db?user=root&password=your_password - */ + DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER)); + userProp.required = true; - public Properties parseURL(String url, Properties defaults) throws java.sql.SQLException { - Properties urlProps = (defaults != null) ? defaults : new Properties(); - if (url == null) { - return null; - } + DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD, info.getProperty(PROPERTY_KEY_PASSWORD)); + passwordProp.required = true; + + DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5]; + propertyInfo[0] = hostProp; + propertyInfo[1] = portProp; + propertyInfo[2] = dbProp; + propertyInfo[3] = userProp; + propertyInfo[4] = passwordProp; + + return propertyInfo; + } + + /** + * example: jdbc:TSDB://127.0.0.1:0/db?user=root&password=your_password + */ + public Properties parseURL(String url, Properties defaults) throws java.sql.SQLException { + Properties urlProps = (defaults != null) ? defaults : new Properties(); + if (url == null) { + return null; + } String lowerUrl = url.toLowerCase(); if (!lowerUrl.startsWith(URL_PREFIX) && !lowerUrl.startsWith(URL_PREFIX1)) { - return null; - } + return null; + } - String urlForMeta = url; + String urlForMeta = url; - String dbProductName = url.substring(url.indexOf(":") + 1); - dbProductName = dbProductName.substring(0, dbProductName.indexOf(":")); - int beginningOfSlashes = url.indexOf("//"); - url = url.substring(beginningOfSlashes + 2); + String dbProductName = url.substring(url.indexOf(":") + 1); + dbProductName = dbProductName.substring(0, dbProductName.indexOf(":")); + int beginningOfSlashes = url.indexOf("//"); + url = url.substring(beginningOfSlashes + 2); - String host = url.substring(0, url.indexOf(":")); - url = url.substring(url.indexOf(":") + 1); - urlProps.setProperty(PROPERTY_KEY_HOST, host); + String host = url.substring(0, url.indexOf(":")); + url = url.substring(url.indexOf(":") + 1); + urlProps.setProperty(PROPERTY_KEY_HOST, host); - String port = url.substring(0, url.indexOf("/")); - urlProps.setProperty(PROPERTY_KEY_PORT, port); - url = url.substring(url.indexOf("/") + 1); + String port = url.substring(0, url.indexOf("/")); + urlProps.setProperty(PROPERTY_KEY_PORT, port); + url = url.substring(url.indexOf("/") + 1); - if (url.indexOf("?") != -1) { - String dbName = url.substring(0, url.indexOf("?")); - urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName); - url = url.trim().substring(url.indexOf("?") + 1); - } else { - // without user & password so return - if(!url.trim().isEmpty()) { - String dbName = url.trim(); - urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName); - } - this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty("user")); - return urlProps; - } + if (url.indexOf("?") != -1) { + String dbName = url.substring(0, url.indexOf("?")); + urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName); + url = url.trim().substring(url.indexOf("?") + 1); + } else { + // without user & password so return + if (!url.trim().isEmpty()) { + String dbName = url.trim(); + urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName); + } + this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty("user")); + return urlProps; + } - String user = ""; + String user = ""; - if (url.indexOf("&") == -1) { - String[] kvPair = url.trim().split("="); - if (kvPair.length == 2) { - setPropertyValue(urlProps, kvPair); - return urlProps; - } - } + if (url.indexOf("&") == -1) { + String[] kvPair = url.trim().split("="); + if (kvPair.length == 2) { + setPropertyValue(urlProps, kvPair); + return urlProps; + } + } - String[] queryStrings = url.trim().split("&"); - for (String queryStr : queryStrings) { - String[] kvPair = queryStr.trim().split("="); - if (kvPair.length < 2){ - continue; - } - setPropertyValue(urlProps, kvPair); - } + String[] queryStrings = url.trim().split("&"); + for (String queryStr : queryStrings) { + String[] kvPair = queryStr.trim().split("="); + if (kvPair.length < 2) { + continue; + } + setPropertyValue(urlProps, kvPair); + } - user = urlProps.getProperty(PROPERTY_KEY_USER).toString(); - this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user); + user = urlProps.getProperty(PROPERTY_KEY_USER).toString(); + this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user); - return urlProps; - } + return urlProps; + } - public void setPropertyValue(Properties property, String[] keyValuePair) { - switch (keyValuePair[0].toLowerCase()) { - case PROPERTY_KEY_USER: - property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]); - break; - case PROPERTY_KEY_PASSWORD: - property.setProperty(PROPERTY_KEY_PASSWORD, keyValuePair[1]); - break; - case PROPERTY_KEY_TIME_ZONE: - property.setProperty(PROPERTY_KEY_TIME_ZONE, keyValuePair[1]); - break; - case PROPERTY_KEY_LOCALE: - property.setProperty(PROPERTY_KEY_LOCALE, keyValuePair[1]); - break; - case PROPERTY_KEY_CHARSET: - property.setProperty(PROPERTY_KEY_CHARSET, keyValuePair[1]); - break; - case PROPERTY_KEY_CONFIG_DIR: - property.setProperty(PROPERTY_KEY_CONFIG_DIR, keyValuePair[1]); - break; - } - } + public void setPropertyValue(Properties property, String[] keyValuePair) { + switch (keyValuePair[0].toLowerCase()) { + case PROPERTY_KEY_USER: + property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]); + break; + case PROPERTY_KEY_PASSWORD: + property.setProperty(PROPERTY_KEY_PASSWORD, keyValuePair[1]); + break; + case PROPERTY_KEY_TIME_ZONE: + property.setProperty(PROPERTY_KEY_TIME_ZONE, keyValuePair[1]); + break; + case PROPERTY_KEY_LOCALE: + property.setProperty(PROPERTY_KEY_LOCALE, keyValuePair[1]); + break; + case PROPERTY_KEY_CHARSET: + property.setProperty(PROPERTY_KEY_CHARSET, keyValuePair[1]); + break; + case PROPERTY_KEY_CONFIG_DIR: + property.setProperty(PROPERTY_KEY_CONFIG_DIR, keyValuePair[1]); + break; + } + } - public int getMajorVersion() { - return 1; - } + public int getMajorVersion() { + return 1; + } - public int getMinorVersion() { - return 1; - } + public int getMinorVersion() { + return 1; + } - public boolean jdbcCompliant() { - return false; - } + public boolean jdbcCompliant() { + return false; + } - public Logger getParentLogger() throws SQLFeatureNotSupportedException { - return null; - } + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } - /** - * Returns the host property - * - * @param props - * the java.util.Properties instance to retrieve the hostname from. - * - * @return the host - */ - public String host(Properties props) { - return props.getProperty(PROPERTY_KEY_HOST, "localhost"); - } + /** + * Returns the host property + * + * @param props the java.util.Properties instance to retrieve the hostname from. + * @return the host + */ + public String host(Properties props) { + return props.getProperty(PROPERTY_KEY_HOST, "localhost"); + } - /** - * Returns the port number property - * - * @param props - * the properties to get the port number from - * - * @return the port number - */ - public int port(Properties props) { - return Integer.parseInt(props.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT)); - } + /** + * Returns the port number property + * + * @param props the properties to get the port number from + * @return the port number + */ + public int port(Properties props) { + return Integer.parseInt(props.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT)); + } - /** - * Returns the database property from props - * - * @param props - * the Properties to look for the database property. - * - * @return the database name. - */ - public String database(Properties props) { - return props.getProperty(PROPERTY_KEY_DBNAME); - } + /** + * Returns the database property from props + * + * @param props the Properties to look for the database property. + * @return the database name. + */ + public String database(Properties props) { + return props.getProperty(PROPERTY_KEY_DBNAME); + } } diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java index a8d6ceb713..5c6b0545e9 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBStatement.java @@ -242,7 +242,7 @@ public class TSDBStatement implements Statement { public void addBatch(String sql) throws SQLException { if (batchedArgs == null) { - batchedArgs = new ArrayList(); + batchedArgs = new ArrayList<>(); } batchedArgs.add(sql); } diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt index db366639ef..e631397348 100644 --- a/src/cq/CMakeLists.txt +++ b/src/cq/CMakeLists.txt @@ -2,6 +2,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) IF (TD_LINUX) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 758d620e57..1a99a84b8e 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -21,6 +21,7 @@ #include #include "taos.h" +#include "tsclient.h" #include "taosdef.h" #include "taosmsg.h" #include "ttimer.h" @@ -30,10 +31,12 @@ #include "tlog.h" #include "twal.h" -#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__); }} -#define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__); }} +#define cFatal(...) { if (cqDebugFlag & DEBUG_FATAL) { taosPrintLog("CQ FATAL ", 255, __VA_ARGS__); }} +#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("CQ ERROR ", 255, __VA_ARGS__); }} +#define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("CQ WARN ", 255, __VA_ARGS__); }} +#define cInfo(...) { if (cqDebugFlag & DEBUG_INFO) { taosPrintLog("CQ ", 255, __VA_ARGS__); }} +#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }} #define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }} -#define cPrint(...) { taosPrintLog("CQ ", 255, __VA_ARGS__); } typedef struct { int vgId; @@ -63,8 +66,6 @@ typedef struct SCqObj { SCqContext * pContext; } SCqObj; -int cqDebugFlag = 135; - static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); @@ -94,7 +95,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { pthread_mutex_init(&pContext->mutex, NULL); - cTrace("vgId:%d, CQ is opened", pContext->vgId); + cInfo("vgId:%d, CQ is opened", pContext->vgId); return pContext; } @@ -125,7 +126,7 @@ void cqClose(void *handle) { taosTmrCleanUp(pContext->tmrCtrl); pContext->tmrCtrl = NULL; - cTrace("vgId:%d, CQ is closed", pContext->vgId); + cInfo("vgId:%d, CQ is closed", pContext->vgId); free(pContext); } @@ -133,7 +134,7 @@ void cqStart(void *handle) { SCqContext *pContext = handle; if (pContext->dbConn || pContext->master) return; - cTrace("vgId:%d, start all CQs", pContext->vgId); + cInfo("vgId:%d, start all CQs", pContext->vgId); pthread_mutex_lock(&pContext->mutex); pContext->master = 1; @@ -149,7 +150,7 @@ void cqStart(void *handle) { void cqStop(void *handle) { SCqContext *pContext = handle; - cTrace("vgId:%d, stop all CQs", pContext->vgId); + cInfo("vgId:%d, stop all CQs", pContext->vgId); if (pContext->dbConn == NULL || pContext->master == 0) return; pthread_mutex_lock(&pContext->mutex); @@ -160,7 +161,7 @@ void cqStop(void *handle) { if (pObj->pStream) { taos_close_stream(pObj->pStream); pObj->pStream = NULL; - cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); + cInfo("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); } else { taosTmrStop(pObj->tmrId); pObj->tmrId = 0; @@ -188,7 +189,7 @@ void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSch pObj->pSchema = tdDupSchema(pSchema); pObj->rowSize = schemaTLen(pSchema); - cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); + cInfo("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); pthread_mutex_lock(&pContext->mutex); @@ -228,7 +229,7 @@ void cqDrop(void *handle) { pObj->tmrId = 0; } - cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); + cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); tdFreeSchema(pObj->pSchema); free(pObj->sqlStr); free(pObj); @@ -236,24 +237,31 @@ void cqDrop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } +static void doCreateStream(void *param, TAOS_RES *result, int code) { + SCqObj* pObj = (SCqObj*)param; + SCqContext* pContext = pObj->pContext; + SSqlObj* pSql = (SSqlObj*)result; + pContext->dbConn = pSql->pTscObj; + cqCreateStream(pContext, pObj); +} + static void cqProcessCreateTimer(void *param, void *tmrId) { SCqObj* pObj = (SCqObj*)param; SCqContext* pContext = pObj->pContext; if (pContext->dbConn == NULL) { - pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0); - if (pContext->dbConn == NULL) { - cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); - } + cDebug("vgId:%d, try connect to TDengine", pContext->vgId); + taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL); + } else { + cqCreateStream(pContext, pObj); } - - cqCreateStream(pContext, pObj); } static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->pContext = pContext; if (pContext->dbConn == NULL) { + cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId); pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl); return; } @@ -262,7 +270,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); if (pObj->pStream) { pContext->num++; - cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); + cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); } else { cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); } @@ -278,7 +286,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { STSchema *pSchema = pObj->pSchema; if (pObj->pStream == NULL) return; - cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); + cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize; char *buffer = calloc(size, 1); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 5d08d1db59..e9aa6a7050 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -245,7 +245,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SQL_SHOW_LEN 256 -#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb +#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 8mb #define TSDB_MAX_BYTES_PER_ROW 16384 #define TSDB_MAX_TAGS_LEN 16384 diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index f66ef6b7a3..f2caf30564 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -65,7 +65,7 @@ int32_t mnodeInitShow() { mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); - tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 5, false, mnodeFreeShowObj, "show"); + tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 5, true, mnodeFreeShowObj, "show"); return 0; } @@ -389,10 +389,12 @@ static bool mnodeAccquireShowObj(SShowObj *pShow) { } static void* mnodePutShowObj(SShowObj *pShow) { + const int32_t DEFAULT_SHOWHANDLE_LIFE_SPAN = tsShellActivityTimer * 6 * 1000; + if (tsMnodeShowCache != NULL) { pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1); uint64_t handleVal = (uint64_t)pShow; - SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(int64_t), &pShow, sizeof(int64_t), 6000); + SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(int64_t), &pShow, sizeof(int64_t), DEFAULT_SHOWHANDLE_LIFE_SPAN); pShow->ppShow = (void**)ppShow; mDebug("%p, show is put into cache, data:%p index:%d", pShow, ppShow, pShow->index); return pShow; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 4400927e9b..cb606df692 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1384,6 +1384,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, } pShow->numOfReads += numOfRows; + const int32_t NUM_OF_COLUMNS = 5; + + mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); mnodeDecDbRef(pDb); return numOfRows; @@ -2122,8 +2125,8 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { } pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId); - mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved", pMsg->rpcMsg.ahandle, pMsg, - pTable->info.tableId, pTable->uid); + mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved, vgId:%d sid:%d", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, pTable->uid, pTable->vgId, pTable->sid); return TSDB_CODE_SUCCESS; } diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 9238869375..6abd382f8e 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -230,4 +230,5 @@ void httpCloseContextByServer(HttpContext *pContext) { pContext->parsed = false; httpRemoveContextFromEpoll(pContext); + httpReleaseContext(pContext, true); } diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index 881fa55fb7..883fa574ff 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -257,20 +257,20 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo HttpEncodeMethod *encode = pContext->encodeMethod; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd, - pContext->user, pContext->session->taos, tstrerror(code), (SSqlObj *)result); + httpError("context:%p, fd:%d, user:%s, query error, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd, + pContext->user, tstrerror(code), (SSqlObj *)result); return; } if (code < 0) { SSqlObj *pObj = (SSqlObj *)result; if (code == TSDB_CODE_TSC_INVALID_SQL) { - httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s, sqlObj:%p, error:%s", pContext, - pContext->fd, pContext->user, pContext->session->taos, tstrerror(code), pObj, pObj->cmd.payload); + httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext, + pContext->fd, pContext->user, tstrerror(code), pObj, pObj->cmd.payload); httpSendTaosdInvalidSqlErrorResp(pContext, pObj->cmd.payload); } else { - httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s, sqlObj:%p", pContext, pContext->fd, - pContext->user, pContext->session->taos, tstrerror(code), pObj); + httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p", pContext, pContext->fd, + pContext->user, tstrerror(code), pObj); httpSendErrorResp(pContext, code); } taos_free_result(result); diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index 5303251d98..704df9f3f2 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -73,12 +73,11 @@ typedef struct SDiskbasedResultBuf { bool comp; // compressed before flushed to disk int32_t nextPos; // next page flush position - const void* handle; // for debug purpose + const void* handle; // for debug purpose SResultBufStatis statis; } SDiskbasedResultBuf; -#define DEFAULT_INTERN_BUF_PAGE_SIZE (4096L) -#define DEFAULT_INMEM_BUF_PAGES 10 +#define DEFAULT_INTERN_BUF_PAGE_SIZE (256L) // in bytes #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} /** diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 314159484d..5320e5622e 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -39,7 +39,6 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf } #define curTimeWindowIndex(_winres) ((_winres)->curIndex) -#define GET_TIMEWINDOW(_winresInfo, _win) (STimeWindow) {(_win)->skey, ((_win)->skey + (_winresInfo)->interval - 1)} #define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1) bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index a78bb7ec51..28b9a60102 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -108,7 +108,7 @@ extern "C" { #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) -#define MAX_INTERVAL_TIME_WINDOW 10000000 +#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results #define TOP_BOTTOM_QUERY_LIMIT 100 enum { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d3012e7662..f2d324e376 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -187,7 +187,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, SDataStatis *pStatis, void *param, int32_t colIndex); static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); -static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo); +static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); static void buildTagQueryResult(SQInfo *pQInfo); @@ -782,6 +782,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; + bool hasPrev = pCtx[0].preAggVals.isSet; + if (IS_MASTER_SCAN(pRuntimeEnv) || closed) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { pCtx[k].nStartQueryTimestamp = pWin->skey; @@ -794,11 +796,17 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed } // not a whole block involved in query processing, statistics data can not be used - pCtx[k].preAggVals.isSet = (forwardStep == numOfTotal); + // NOTE: the original value of isSet have been changed here + if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) { + pCtx[k].preAggVals.isSet = false; + } if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunction(&pCtx[k]); } + + // restore it + pCtx[k].preAggVals.isSet = hasPrev; } } } @@ -1975,8 +1983,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo // todo handle the case the the order irrelevant query type mixed up with order critical query type // descending order query for last_row query if (isFirstLastRowQuery(pQuery)) { - qDebug("QInfo:%p scan order changed for last_row query, old:%d, new:%d", GET_QINFO_ADDR(pQuery), - pQuery->order.order, TSDB_ORDER_ASC); + qDebug("QInfo:%p scan order changed for last_row query, old:%d, new:%d", pQInfo, pQuery->order.order, TSDB_ORDER_ASC); pQuery->order.order = TSDB_ORDER_ASC; if (pQuery->window.skey > pQuery->window.ekey) { @@ -2078,13 +2085,14 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) { SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t MIN_ROWS_PER_PAGE = 4; *rowsize = (int32_t)(pQuery->rowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery)); int32_t overhead = sizeof(tFilePage); // one page contains at least two rows *ps = DEFAULT_INTERN_BUF_PAGE_SIZE; - while(((*rowsize) * 2) > (*ps) - overhead) { + while(((*rowsize) * MIN_ROWS_PER_PAGE) > (*ps) - overhead) { *ps = (*ps << 1u); } @@ -2112,7 +2120,7 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat } } - // no statistics data + // no statistics data, load the true data block if (index == -1) { return true; } @@ -2122,8 +2130,17 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat return true; } - // all points in current column are NULL, no need to check its boundary value + // all data in current column are NULL, no need to check its boundary value if (pDataStatis[index].numOfNull == numOfRows) { + + // if isNULL query exists, load the null data column + for (int32_t j = 0; j < pFilterInfo->numOfFilters; ++j) { + SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j]; + if (pFilterElem->fp == isNull_filter) { + return true; + } + } + continue; } @@ -2949,11 +2966,10 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { STableQueryInfo *item = taosArrayGetP(pGroup, i); SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid); - pageList = list; - tid = TSDB_TABLEID(item->pTable)->tid; - if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) { pTableList[numOfTables++] = item; + tid = TSDB_TABLEID(item->pTable)->tid; + pageList = list; } } @@ -3707,7 +3723,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void return pTableQueryInfo; } -void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo) { +void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { if (pTableQueryInfo == NULL) { return; } @@ -4383,6 +4399,8 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { return true; } +static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); + static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -4422,16 +4440,20 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) // update the query time window pQuery->window = cond.twindow; - size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); - for(int32_t i = 0; i < numOfGroups; ++i) { - SArray *group = GET_TABLEGROUP(pQInfo, i); + if (pQInfo->tableGroupInfo.numOfTables == 0) { + pQInfo->tableqinfoGroupInfo.numOfTables = 0; + } else { + size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); + for(int32_t i = 0; i < numOfGroups; ++i) { + SArray *group = GET_TABLEGROUP(pQInfo, i); - size_t t = taosArrayGetSize(group); - for (int32_t j = 0; j < t; ++j) { - STableQueryInfo *pCheckInfo = taosArrayGetP(group, j); + size_t t = taosArrayGetSize(group); + for (int32_t j = 0; j < t; ++j) { + STableQueryInfo *pCheckInfo = taosArrayGetP(group, j); - pCheckInfo->win = pQuery->window; - pCheckInfo->lastKey = pCheckInfo->win.skey; + pCheckInfo->win = pQuery->window; + pCheckInfo->lastKey = pCheckInfo->win.skey; + } } } } else if (isPointInterpoQuery(pQuery)) { @@ -6317,17 +6339,43 @@ _error: } static void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) { - if (pFilter == NULL) { + if (pFilter == NULL || numOfFilters == 0) { return; } + for (int32_t i = 0; i < numOfFilters; i++) { if (pFilter[i].filterstr) { free((void*)(pFilter[i].pz)); } } + free(pFilter); } +static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) { + if (pTableqinfoGroupInfo->pGroupList != NULL) { + int32_t numOfGroups = (int32_t) taosArrayGetSize(pTableqinfoGroupInfo->pGroupList); + for (int32_t i = 0; i < numOfGroups; ++i) { + SArray *p = taosArrayGetP(pTableqinfoGroupInfo->pGroupList, i); + + size_t num = taosArrayGetSize(p); + for(int32_t j = 0; j < num; ++j) { + STableQueryInfo* item = taosArrayGetP(p, j); + destroyTableQueryInfoImpl(item); + } + + taosArrayDestroy(p); + } + } + + taosArrayDestroy(pTableqinfoGroupInfo->pGroupList); + taosHashCleanup(pTableqinfoGroupInfo->map); + + pTableqinfoGroupInfo->pGroupList = NULL; + pTableqinfoGroupInfo->map = NULL; + pTableqinfoGroupInfo->numOfTables = 0; +} + static void freeQInfo(SQInfo *pQInfo) { if (!isValidQInfo(pQInfo)) { return; @@ -6388,25 +6436,9 @@ static void freeQInfo(SQInfo *pQInfo) { taosTFree(pQuery); } - // todo refactor, extract method to destroytableDataInfo - if (pQInfo->tableqinfoGroupInfo.pGroupList != NULL) { - int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo)); - for (int32_t i = 0; i < numOfGroups; ++i) { - SArray *p = GET_TABLEGROUP(pQInfo, i); - - size_t num = taosArrayGetSize(p); - for(int32_t j = 0; j < num; ++j) { - STableQueryInfo* item = taosArrayGetP(p, j); - destroyTableQueryInfo(item); - } - - taosArrayDestroy(p); - } - } + doDestroyTableQueryInfo(&pQInfo->tableqinfoGroupInfo); taosTFree(pQInfo->pBuf); - taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList); - taosHashCleanup(pQInfo->tableqinfoGroupInfo.map); tsdbDestroyTableGroup(&pQInfo->tableGroupInfo); taosArrayDestroy(pQInfo->arrTableIdInfo); diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 8660d9f4fe..f186726c01 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -38,6 +38,9 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ pFillInfo->numOfTags = numOfTags; pFillInfo->numOfCols = numOfCols; pFillInfo->precision = precision; + + pFillInfo->interval.interval = slidingTime; + pFillInfo->interval.intervalUnit = slidingUnit; pFillInfo->interval.sliding = slidingTime; pFillInfo->interval.slidingUnit = slidingUnit; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f59bf62ec5..cb318d5c24 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -580,6 +580,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); if (pConn->chandle == NULL) { + tError("failed to connect to:0x%x:%d", pConn->peerIp, pConn->peerPort); + terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcCloseConn(pConn); pConn = NULL; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index f96b902efd..3f57a8b5cd 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -511,9 +511,9 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { sInfo("%s, it is configured", pPeer->id); int ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { - sDebug("%s, start to check peer connection", pPeer->id); int32_t checkMs = 100 + (pNode->vgId * 10) % 100; - if (pNode->vgId) checkMs = tsStatusInterval * 2000 + 100; + if (pNode->vgId > 1) checkMs = tsStatusInterval * 2000 + checkMs; + sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs); taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index f390208af1..619b3dfca4 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2114,6 +2114,8 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) { totalNumOfTable++; taosArrayPush(pGroup, &keyInfo); } else { + taosArrayDestroy(pGroup); + taosArrayRemove(groupList->pGroupList, j); numOfGroups -= 1; j -= 1; @@ -2736,4 +2738,5 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { } taosArrayDestroy(pGroupList->pGroupList); + pGroupList->numOfTables = 0; } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 4d737ebe66..49b9996cf4 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -260,7 +260,12 @@ static void incRefFn(void* ptNode) { } void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { - if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0 || pCacheObj->deleting == 1) { + if (pCacheObj == NULL || pCacheObj->deleting == 1) { + return NULL; + } + + if (taosHashGetSize(pCacheObj->pHashTable) == 0) { + atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); return NULL; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 017eeaf426..58e97075ac 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -58,7 +58,8 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return TSDB_CODE_APP_NOT_READY; // TODO: Later, let slave to support query - if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { + // if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { + if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); return TSDB_CODE_APP_NOT_READY; } diff --git a/tests/perftest-scripts/coverage_test.sh b/tests/perftest-scripts/coverage_test.sh index a0c8fe4b3f..5085ec89d0 100755 --- a/tests/perftest-scripts/coverage_test.sh +++ b/tests/perftest-scripts/coverage_test.sh @@ -53,7 +53,7 @@ function buildTDengine { function runGeneralCaseOneByOne { while read -r line; do if [[ $line =~ ^./test.sh* ]]; then - case=`echo $line | grep -w "general\|unique\/mnode\/mgmt33.sim\|unique\/stable\/dnode3.sim\|unique\/cluster\/balance3.sim\|unique\/arbitrator\/offline_replica2_alterTable_online.sim"|awk '{print $NF}'` + case=`echo $line | grep sim$ |awk '{print $NF}'` if [ -n "$case" ]; then ./test.sh -f $case > /dev/null 2>&1 && \ diff --git a/tests/pytest/client/twoClients.py b/tests/pytest/client/twoClients.py new file mode 100644 index 0000000000..1a1b36c554 --- /dev/null +++ b/tests/pytest/client/twoClients.py @@ -0,0 +1,96 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import os +import sys +sys.path.insert(0, os.getcwd()) +from util.log import * +from util.sql import * +from util.dnodes import * +import taos + + +class TwoClients: + def initConnection(self): + self.host = "127.0.0.1" + self.user = "root" + self.password = "taosdata" + self.config = "/home/xp/git/TDengine/sim/dnode1/cfg" + + def run(self): + tdDnodes.init("") + tdDnodes.setTestCluster(False) + tdDnodes.setValgrind(False) + + tdDnodes.stopAll() + tdDnodes.deploy(1) + tdDnodes.start(1) + + # first client create a stable and insert data + conn1 = taos.connect(self.host, self.user, self.password, self.config) + cursor1 = conn1.cursor() + cursor1.execute("drop database if exists db") + cursor1.execute("create database db") + cursor1.execute("use db") + cursor1.execute("create table tb (ts timestamp, id int) tags(loc nchar(30))") + cursor1.execute("insert into t0 using tb tags('beijing') values(now, 1)") + + # second client alter the table created by cleint + conn2 = taos.connect(self.host, self.user, self.password, self.config) + cursor2 = conn2.cursor() + cursor2.execute("use db") + cursor2.execute("alter table tb add column name nchar(30)") + + # first client should not be able to use the origin metadata + tdSql.init(cursor1, True) + tdSql.error("insert into t0 values(now, 2)") + + # first client should be able to insert data with udpated medadata + tdSql.execute("insert into t0 values(now, 2, 'test')") + tdSql.query("select * from tb") + tdSql.checkRows(2) + + # second client drop the table + cursor2.execute("drop table t0") + cursor2.execute("create table t0 using tb tags('beijing')") + + tdSql.execute("insert into t0 values(now, 2, 'test')") + tdSql.query("select * from tb") + tdSql.checkRows(1) + + # error expected for two clients drop the same cloumn + cursor2.execute("alter table tb drop column name") + tdSql.error("alter table tb drop column name") + + cursor2.execute("alter table tb add column speed int") + tdSql.error("alter table tb add column speed int") + + + tdSql.execute("alter table tb add column size int") + tdSql.query("describe tb") + tdSql.checkRows(5) + tdSql.checkData(0, 0, "ts") + tdSql.checkData(1, 0, "id") + tdSql.checkData(2, 0, "speed") + tdSql.checkData(3, 0, "size") + tdSql.checkData(4, 0, "loc") + + + cursor1.close() + cursor2.close() + conn1.close() + conn2.close() + +clients = TwoClients() +clients.initConnection() +clients.run() \ No newline at end of file diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index bfa546f92e..8b92461c0e 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -16,6 +16,7 @@ python3 ./test.py -f insert/nchar.py python3 ./test.py -f insert/nchar-unicode.py python3 ./test.py -f insert/multi.py python3 ./test.py -f insert/randomNullCommit.py +python3 insert/retentionpolicy.py python3 ./test.py -f table/column_name.py python3 ./test.py -f table/column_num.py @@ -187,6 +188,13 @@ python3 ./test.py -f functions/function_top.py #python3 ./test.py -f functions/function_twa.py python3 queryCount.py python3 ./test.py -f query/queryGroupbyWithInterval.py +python3 client/twoClients.py +python3 test.py -f query/queryInterval.py # tools python3 test.py -f tools/taosdemo.py + +# subscribe +python3 test.py -f subscribe/singlemeter.py +python3 test.py -f subscribe/stability.py +python3 test.py -f subscribe/supertable.py \ No newline at end of file diff --git a/tests/pytest/insert/retentionpolicy.py b/tests/pytest/insert/retentionpolicy.py new file mode 100644 index 0000000000..bd294a24f3 --- /dev/null +++ b/tests/pytest/insert/retentionpolicy.py @@ -0,0 +1,108 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +import datetime +sys.path.insert(0, os.getcwd()) +import taos +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class TDTestRetetion: + def init(self): + self.queryRows=0 + tdLog.debug("start to execute %s" % __file__) + tdLog.info("prepare cluster") + tdDnodes.init("") + tdDnodes.setTestCluster(False) + tdDnodes.setValgrind(False) + tdDnodes.stopAll() + tdDnodes.deploy(1) + tdDnodes.start(1) + print(tdDnodes.getDnodesRootDir()) + self.conn = taos.connect(config=tdDnodes.getSimCfgPath()) + tdSql.init(self.conn.cursor()) + tdSql.execute('reset query cache') + def checkRows(self, expectRows,sql): + if self.queryRows == expectRows: + tdLog.info("sql:%s, queryRows:%d == expect:%d" % (sql, self.queryRows, expectRows)) + else: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, sql, self.queryRows, expectRows) + os.system("timedatectl set-ntp true") + tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args) + + def run(self): + + tdLog.info("=============== step1") + tdSql.execute('create database test keep 3 days 1;') + tdSql.execute('use test;') + tdSql.execute('create table test(ts timestamp,i int);') + + cmd = 'insert into test values(now-2d,11)(now-1d,11)(now,11)(now+1d,11);' + tdLog.info(cmd) + tdSql.execute(cmd) + tdSql.query('select * from test') + tdSql.checkRows(4) + + tdLog.info("=============== step2") + tdDnodes.stop(1) + os.system("timedatectl set-ntp false") + os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")") + tdDnodes.start(1) + cmd = 'insert into test values(now,11);' + tdLog.info(cmd) + tdSql.execute(cmd) + tdSql.query('select * from test') + tdSql.checkRows(5) + + tdLog.info("=============== step3") + tdDnodes.stop(1) + os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")") + tdDnodes.start(1) + cmd = 'insert into test values(now-1d,11);' + tdLog.info(cmd) + tdSql.execute(cmd) + tdSql.query('select * from test') + tdSql.checkRows(6) + tdLog.info("=============== step4") + tdDnodes.stop(1) + tdDnodes.start(1) + cmd = 'insert into test values(now,11);' + tdLog.info(cmd) + tdSql.execute(cmd) + tdSql.query('select * from test') + tdSql.checkRows(7) + + tdLog.info("=============== step5") + tdDnodes.stop(1) + tdDnodes.start(1) + cmd='select * from test where ts > now-1d' + queryRows=tdSql.query('select * from test where ts > now-1d') + self.checkRows(1,cmd) + + def stop(self): + os.system("timedatectl set-ntp true") + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +clients = TDTestRetetion() +clients.init() +clients.run() +clients.stop() + diff --git a/tests/pytest/query/queryInterval.py b/tests/pytest/query/queryInterval.py new file mode 100644 index 0000000000..db2c3fdeec --- /dev/null +++ b/tests/pytest/query/queryInterval.py @@ -0,0 +1,62 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import tdLog +from util.cases import tdCases +from util.sql import tdSql + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.ts = 1538548685000 + + def run(self): + tdSql.prepare() + + tdSql.execute("create table st (ts timestamp, voltage int) tags (loc nchar(30))") + tdSql.execute("insert into t0 using st tags('beijing') values(now, 220) (now - 15d, 221) (now - 30d, 225) (now - 35d, 228) (now - 45d, 222)") + tdSql.execute("insert into t1 using st tags('shanghai') values(now, 220) (now - 60d, 221) (now - 50d, 225) (now - 40d, 228) (now - 20d, 222)") + + tdSql.query("select avg(voltage) from st interval(1n)") + tdSql.checkRows(3) + tdSql.checkData(0, 1, 223.0) + tdSql.checkData(1, 1, 225.0) + tdSql.checkData(2, 1, 220.333333) + + tdSql.query("select avg(voltage) from st interval(1n, 15d)") + tdSql.checkRows(3) + tdSql.checkData(0, 1, 224.8) + tdSql.checkData(1, 1, 222.666666) + tdSql.checkData(2, 1, 220.0) + + tdSql.query("select avg(voltage) from st interval(1n, 15d) group by loc") + tdSql.checkRows(6) + tdSql.checkData(0, 1, 225.0) + tdSql.checkData(1, 1, 223.0) + tdSql.checkData(2, 1, 220.0) + tdSql.checkData(3, 1, 224.666666) + tdSql.checkData(4, 1, 222.0) + tdSql.checkData(5, 1, 220.0) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 7de8efdfe9..9abec354c6 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -123,8 +123,8 @@ class TDSql: def checkData(self, row, col, data): self.checkRowCol(row, col) - if self.queryResult[row][col] != data: - if str(self.queryResult[row][col]) != str(data): + if self.queryResult[row][col] != data: + if str(self.queryResult[row][col]) == str(data): tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" % (self.sql, row, col, self.queryResult[row][col], data)) return diff --git a/tests/script/general/http/chunked.sim b/tests/script/general/http/chunked.sim index 8673655d96..6592c761c6 100644 --- a/tests/script/general/http/chunked.sim +++ b/tests/script/general/http/chunked.sim @@ -3,7 +3,7 @@ sleep 3000 system sh/deploy.sh -n dnode1 -i 1 system sh/cfg.sh -n dnode1 -c wallevel -v 0 system sh/cfg.sh -n dnode1 -c http -v 1 -system sh/cfg.sh -n dnode1 -c maxSQLLength -v 7340032 +system sh/cfg.sh -n dnode1 -c maxSQLLength -v 340032 system sh/exec.sh -n dnode1 -s start sleep 3000 diff --git a/tests/script/general/parser/fill.sim b/tests/script/general/parser/fill.sim index 488f807fbc..f89c27d71f 100644 --- a/tests/script/general/parser/fill.sim +++ b/tests/script/general/parser/fill.sim @@ -850,6 +850,8 @@ if $rows != 12 then return -1 endi +print =====================>td-1442 +sql_error select count(*) from m_fl_tb0 interval(1s) fill(prev); print =============== clear sql drop database $db diff --git a/tests/script/general/parser/first_last_query.sim b/tests/script/general/parser/first_last_query.sim index d11bdccb12..8127c19230 100644 --- a/tests/script/general/parser/first_last_query.sim +++ b/tests/script/general/parser/first_last_query.sim @@ -65,22 +65,23 @@ endi if $data00 != @18-09-18 01:40:00.000@ then return -1 endi -#if $data01 != NULL then + if $data01 != 999 then return -1 -endi -#if $data02 != NULL then +endi + if $data02 != 999 then return -1 -endi -#if $data03 != NULL then +endi + if $data03 != 999.00000 then return -1 endi -#if $data04 != NULL then + if $data04 != 999.000000000 then return -1 endi + #if $data05 != NULL then if $data05 != 999 then return -1 @@ -127,7 +128,7 @@ if $data01 != 0 then return -1 endi -#add check for out of range first/last query +print =============> add check for out of range first/last query sql select first(ts),last(ts) from first_tb4 where ts>'2018-9-18 1:40:01'; if $row != 0 then return -1 @@ -136,4 +137,130 @@ endi sql select first(ts),last(ts) from first_tb4 where ts<'2018-9-17 8:50:0'; if $row != 0 then return -1 +endi + +#first/last mix up query +#select first(size),last(size) from stest interval(1d) group by tbname; +print =====================>td-1477 + +sql create table stest(ts timestamp,size INT,filenum INT) tags (appname binary(500),tenant binary(500)); +sql insert into test1 using stest tags('test1','aaa') values ('2020-09-04 16:53:54.003',210,3); +sql insert into test2 using stest tags('test1','aaa') values ('2020-09-04 16:53:56.003',210,3); +sql insert into test11 using stest tags('test11','bbb') values ('2020-09-04 16:53:57.003',210,3); +sql insert into test12 using stest tags('test11','bbb') values ('2020-09-04 16:53:58.003',210,3); +sql insert into test21 using stest tags('test21','ccc') values ('2020-09-04 16:53:59.003',210,3); +sql insert into test22 using stest tags('test21','ccc') values ('2020-09-04 16:54:54.003',210,3); +sql select sum(size) from stest group by appname; +if $rows != 3 then + return -1 +endi + +if $data00 != 420 then + return -1 +endi +if $data10 != 420 then + return -1 +endi +if $data20 != 420 then + return -1 +endi + +if $data01 != @test1@ then +return -1 +endi +if $data11 != @test11@ then +return -1 +endi +if $data21 != @test21@ then +return -1 +endi + +sql select sum(size) from stest interval(1d) group by appname; +if $rows != 3 then + return -1 +endi + +#2020-09-04 00:00:00.000 | 420 | test1 | +#2020-09-04 00:00:00.000 | 420 | test11 | +#2020-09-04 00:00:00.000 | 420 | test21 | +if $data00 != @20-09-04 00:00:00.000@ then + return -1 +endi + +if $data10 != @20-09-04 00:00:00.000@ then + return -1 +endi + +if $data20 != @20-09-04 00:00:00.000@ then + return -1 +endi + +if $data01 != 420 then + print expect 420 , actual $data01 + return -1 +endi + +if $data11 != 420 then + return -1 +endi + +if $data21 != 420 then + return -1 +endi + +if $data02 != @test1@ then +return -1 +endi +if $data12 != @test11@ then +return -1 +endi +if $data22 != @test21@ then +return -1 +endi + +print ===================>td-1477, one table has only one block occurs this bug. +sql select first(size),count(*),LAST(SIZE) from stest where tbname in ('test1', 'test2') interval(1d) group by tbname; +if $rows != 2 then + return -1 +endi + +if $data00 != @20-09-04 00:00:00.000@ then + return -1 +endi + +if $data01 != 210 then + return -1 +endi + +if $data02 != 1 then + return -1 +endi + +if $data03 != 210 then + return -1 +endi + +if $data04 != @test1@ then + return -1 +endi + +if $data10 != @20-09-04 00:00:00.000@ then + return -1 +endi + +if $data11 != 210 then + return -1 +endi + +if $data12 != 1 then + return -1 +endi + +if $data13 != 210 then + return -1 +endi + +if $data14 != @test2@ then + print expect test2 , actual: $data14 + return -1 endi \ No newline at end of file diff --git a/tests/script/general/parser/lastrow_query.sim b/tests/script/general/parser/lastrow_query.sim index e9d8ce413d..f81a48d5b2 100644 --- a/tests/script/general/parser/lastrow_query.sim +++ b/tests/script/general/parser/lastrow_query.sim @@ -218,4 +218,10 @@ endi if $data04 != 123.981000000 then print expect 123.981000000, actual: $data04 return -1 +endi + +sql create table tu(ts timestamp, k int) +sql select last_row(*) from tu +if $row != 0 then + return -1 endi \ No newline at end of file diff --git a/tests/script/general/parser/where.sim b/tests/script/general/parser/where.sim index fb15fb6dbe..f9fd919bd6 100644 --- a/tests/script/general/parser/where.sim +++ b/tests/script/general/parser/where.sim @@ -308,13 +308,25 @@ sleep 2000 system sh/exec.sh -n dnode1 -s start -sql select * from wh_mt0 where c3 = 'abc' and tbname in ('test_null_filter'); +sql_error select * from wh_mt0 where c3 = 'abc' and tbname in ('test_null_filter'); + +sql select * from wh_mt0 where c3 = '1' and tbname in ('test_null_filter'); if $row != 0 then return -1 endi -sql select * from wh_mt0 where c3 = 'abc' and tbname in ('test_null_filter'); -if $row != 0 then +sql select * from wh_mt0 where c3 = '1'; +if $row == 0 then + return -1 +endi + +sql select * from wh_mt0 where c3 is null and tbname in ('test_null_filter'); +if $rows != 10000 then + return -1 +endi + +sql select * from wh_mt0 where c3 is not null and tbname in ('test_null_filter'); +if $rows != 0 then return -1 endi diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 0d444a5a6e..8fccb1442f 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -128,6 +128,7 @@ echo "tmrDebugFlag 131" >> $TAOS_CFG echo "udebugFlag 135" >> $TAOS_CFG echo "sdebugFlag 135" >> $TAOS_CFG echo "wdebugFlag 135" >> $TAOS_CFG +echo "cqdebugFlag 135" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG diff --git a/tests/script/tmp/prepare.sim b/tests/script/tmp/prepare.sim index 8b8f206233..343c422e9f 100644 --- a/tests/script/tmp/prepare.sim +++ b/tests/script/tmp/prepare.sim @@ -34,11 +34,11 @@ system sh/cfg.sh -n dnode4 -c http -v 1 return # for crash_gen -system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2 +system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 10 system sh/cfg.sh -n dnode1 -c rpcMaxTime -v 101 system sh/cfg.sh -n dnode1 -c cache -v 2 system sh/cfg.sh -n dnode1 -c keep -v 36500 -system sh/cfg.sh -n dnode1 -c walLevel -v 2 +system sh/cfg.sh -n dnode1 -c walLevel -v 1 # for windows diff --git a/tests/script/unique/cluster/vgroup100.sim b/tests/script/unique/cluster/vgroup100.sim index cddb38cefd..bde6dd2462 100644 --- a/tests/script/unique/cluster/vgroup100.sim +++ b/tests/script/unique/cluster/vgroup100.sim @@ -42,9 +42,11 @@ $count = 2 while $count < 102 $db = d . $count $tb = $db . .t + $tb2 = $db . .t2 sql create database $db replica 3 cache 1 blocks 3 sql create table $tb (ts timestamp, i int) sql insert into $tb values(now, 1) + sql create table $tb2 as select count(*) from $tb interval(10s) $count = $count + 1 print insert into $tb values(now, 1) ==> finished endw @@ -74,7 +76,7 @@ print ============================== step6 system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode3 -s start -sleep 3000 +sleep 10000 print ============================== step7 diff --git a/tests/test-all.sh b/tests/test-all.sh index 275c6b1677..84b663809d 100755 --- a/tests/test-all.sh +++ b/tests/test-all.sh @@ -9,8 +9,9 @@ NC='\033[0m' function runSimCaseOneByOne { while read -r line; do - if [[ $line =~ ^run.* ]]; then - case=`echo $line | awk '{print $NF}'` + if [[ $line =~ ^./test.sh* ]]; then + case=`echo $line | grep sim$ |awk '{print $NF}'` + start_time=`date +%s` ./test.sh -f $case > /dev/null 2>&1 && \ echo -e "${GREEN}$case success${NC}" | tee -a out.log || \ @@ -54,7 +55,7 @@ if [ "$2" != "python" ]; then runSimCaseOneByOne regressionSuite.sim elif [ "$1" == "full" ]; then echo "### run TSIM full test ###" - runSimCaseOneByOne fullGeneralSuite.sim + runSimCaseOneByOne jenkins/basic.txt elif [ "$1" == "smoke" ] || [ -z "$1" ]; then echo "### run TSIM smoke test ###" runSimCaseOneByOne basicSuite.sim