Merge branch 'develop' into feature/update

This commit is contained in:
Hongze Cheng 2020-09-22 15:37:10 +08:00
commit 153288bdfb
54 changed files with 1110 additions and 589 deletions

View File

@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.0.3.0")
SET(TD_VER_NUMBER "2.0.4.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)

View File

@ -1019,5 +1019,5 @@ SELECT AVG(current),MAX(current),LEASTSQUARES(current, start_val, step_val), PER
- 表名最大长度为193每行数据最大长度16k个字符
- 列名最大长度为65最多允许1024列最少需要2列第一列必须是时间戳
- 标签最多允许128个可以0个标签总长度不超过16k个字符
- SQL语句最大长度65480个字符但可通过系统配置参数maxSQLLength修改最长可配置为8M
- SQL语句最大长度65480个字符但可通过系统配置参数maxSQLLength修改最长可配置为1M
- 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制

View File

@ -67,7 +67,7 @@ TDengine 分布式架构的逻辑结构图如下:
<center> 图 1 TDengine架构示意图 </center>
一个完整的 TDengine 系统是运行在一到多个物理节点上的,逻辑上,它包含数据节点(dnode)、TDengine客户端(taosc)以及应用(app)。系统中存在一到多个数据节点,这些数据节点组成一个集群(cluster)。应用通过taosc的API与TDengine集群进行互动。下面对每个逻辑单元进行简要介绍。
**物理节点(pnode):** pnode是一独立运行、拥有自己的计算、存储和网络能力的计算机可以是安装有OS的物理机、虚拟机或容器。物理节点由其配置的 FQDN(Fully Qualified Domain Name)来标识。
**物理节点(pnode):** pnode是一独立运行、拥有自己的计算、存储和网络能力的计算机可以是安装有OS的物理机、虚拟机或容器。物理节点由其配置的 FQDN(Fully Qualified Domain Name)来标识。TDengine完全依赖FQDN来进行网络通讯如果不了解FQDN请看博文《[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)》。
**数据节点(dnode):** dnode 是 TDengine 服务器侧执行代码 taosd 在物理节点上的一个运行实例一个工作的系统必须有至少一个数据节点。dnode包含零到多个逻辑的虚拟节点(VNODE),零或者至多一个逻辑的管理节点(mnode)。dnode在系统中的唯一标识由实例的End Point (EP )决定。EP是dnode所在物理节点的FQDN (Fully Qualified Domain Name)和系统所配置的网络端口号(Port)的组合。通过配置不同的端口,一个物理节点(一台物理机、虚拟机或容器)可以运行多个实例,或有多个数据节点。

View File

@ -2,13 +2,13 @@
多个TDengine服务器也就是多个taosd的运行实例可以组成一个集群以保证TDengine的高可靠运行并提供水平扩展能力。要了解TDengine 2.0的集群管理需要对集群的基本概念有所了解请看TDengine 2.0整体架构一章。而且在安装集群之前,先请按照[《立即开始》](https://www.taosdata.com/cn/getting-started20/)一章安装并体验单节点功能。
集群的每个数据节点是由End Point来唯一标识的End Point是由FQDN(Fully Qualified Domain Name)外加Port组成比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname可通过Linux命令`hostname -f`获取,FQDN配置参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)。端口是这个数据节点对外服务的端口号缺省是6030但可以通过taos.cfg里配置参数serverPort进行修改。一个物理节点可能配置了多个hostname, TDengine会自动获取第一个但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问可以将参数fqdn设置为本节点的IP地址。
集群的每个数据节点是由End Point来唯一标识的End Point是由FQDN(Fully Qualified Domain Name)外加Port组成比如 h1.taosdata.com:6030。一般FQDN就是服务器的hostname可通过Linux命令`hostname -f`获取如何配置FQDN参考:[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)。端口是这个数据节点对外服务的端口号缺省是6030但可以通过taos.cfg里配置参数serverPort进行修改。一个物理节点可能配置了多个hostname, TDengine会自动获取第一个但也可以通过taos.cfg里配置参数fqdn进行指定。如果习惯IP地址直接访问可以将参数fqdn设置为本节点的IP地址。
TDengine的集群管理极其简单除添加和删除节点需要人工干预之外其他全部是自动完成最大程度的降低了运维的工作量。本章对集群管理的操作做详细的描述。
## 准备工作
**第零步**规划集群所有物理节点的FQDN将规划好的FQDN分别添加到每个物理节点的/etc/hostname修改每个物理节点的/etc/hosts将所有集群物理节点的IP与FQDN的对应添加好【如部署了DNS请联系网络管理员在DNS上做好相关配置】
**第零步**如果没有部署DNS服务请规划集群所有物理节点的FQDN然后按照《[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)》里的步骤将所有集群物理节点的IP与FQDN的对应关系添加好。
**第一步**如果搭建集群的物理节点中存有之前的测试数据、装过1.X的版本或者装过其他版本的TDengine请先将其删除并清空所有数据具体步骤请参考博客[《TDengine多种安装包的安装和卸载》](https://www.taosdata.com/blog/2019/08/09/566.html )
**注意1**因为FQDN的信息会写进文件如果之前没有配置或者更改FQDN且启动了TDengine。请一定在确保数据无用或者备份的前提下清理一下之前的数据rm -rf /var/lib/taos/
@ -22,7 +22,8 @@ TDengine的集群管理极其简单除添加和删除节点需要人工干预
1. 每个物理节点上执行命令`hostname -f`查看和确认所有节点的hostname是不相同的(应用驱动所在节点无需做此项检查)
2. 每个物理节点上执行`ping host`, 其中host是其他物理节点的hostname, 看能否ping通其它物理节点; 如果不能ping通需要检查网络设置, 或/etc/hosts文件(Windows系统默认路径为C:\Windows\system32\drivers\etc\hosts)或DNS的配置。如果无法ping通是无法组成集群的
3. 每个数据节点的End Point就是输出的hostname外加端口号比如h1.taosdata.com:6030
3. 从应用运行的物理节点ping taosd运行的数据节点如果无法ping通应用是无法连接taosd的请检查应用所在物理节点的DNS设置或hosts文件
4. 每个数据节点的End Point就是输出的hostname外加端口号比如h1.taosdata.com:6030
**第五步**修改TDengine的配置文件所有节点的文件/etc/taos/taos.cfg都需要修改。假设准备启动的第一个数据节点End Point为 h1.taosdata.com:6030, 其与集群配置相关参数如下:
@ -30,8 +31,8 @@ TDengine的集群管理极其简单除添加和删除节点需要人工干预
// firstEp 是每个数据节点首次启动后连接的第一个数据节点
firstEp h1.taosdata.com:6030
// 配置本数据节点的FQDN如果本机只有一个hostname, 无需配置
fqdn h1.taosdata.com
// 必须配置本数据节点的FQDN如果本机只有一个hostname, 可注释掉本配置
fqdn h1.taosdata.com
// 配置本数据节点的端口号缺省是6030
serverPort 6030
@ -40,7 +41,7 @@ serverPort 6030
arbitrator ha.taosdata.com:6042
```
一定要修改的参数是firstEp和fqdn, 其他参数可不做任何修改,除非你很清楚为什么要修改。
一定要修改的参数是firstEp和fqdn。在每个数据节点firstEp需全部配置成一样**但fqdn一定要配置成其所在数据节点的值**。其他参数可不做任何修改,除非你很清楚为什么要修改。
**加入到集群中的数据节点dnode涉及集群相关的下表11项参数必须完全相同否则不能成功加入到集群中。**
@ -114,6 +115,8 @@ taos>
## 数据节点管理
上面已经介绍如何从零开始搭建集群。集群组建完后,还可以随时添加新的数据节点进行扩容,或删除数据节点,并检查集群当前状态。
### 添加数据节点
执行CLI程序taos, 使用root账号登录进系统, 执行:

View File

@ -25,6 +25,7 @@ INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6,
- 要提高写入效率需要批量写入。一批写入的记录条数越多插入效率就越高。但一条记录不能超过16K一条SQL语句总长度不能超过64K可通过参数maxSQLLength配置最大可配置为8M
- TDengine支持多线程同时写入要进一步提高写入速度一个客户端需要打开20个以上的线程同时写。但线程数达到一定数量后无法再提高甚至还会下降因为线程切频繁切换带来额外开销。
- 对同一张表,如果新插入记录的时间戳已经存在,新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。如果应用自动生成记录,很有可能生成的时间戳是一样的,这样,成功插入的记录条数会小于应用插入的记录条数。
- 写入的数据的时间戳必须大于当前时间减去配置参数keep的时间。如果keep配置为3650天那么无法写入比3650天还老的数据。写入数据的时间戳也不能大于当前时间加配置参数days。如果days配置为2那么无法写入比当前时间还晚2天的数据。
## Prometheus直接写入
[Prometheus](https://www.prometheus.io/)作为Cloud Native Computing Fundation毕业的项目在性能监控以及K8S性能监控领域有着非常广泛的应用。TDengine提供一个小工具[Bailongma](https://github.com/taosdata/Bailongma)只需在Prometheus做简单配置无需任何代码就可将Prometheus采集的数据直接写入TDengine并按规则在TDengine自动创建库和相关表项。博文[用Docker容器快速搭建一个Devops监控Demo](https://www.taosdata.com/blog/2020/02/03/1189.html)即是采用bailongma将Prometheus和Telegraf的数据写入TDengine中的示例可以参考。

View File

@ -326,7 +326,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*interBytes = dataBytes + sizeof(SLastrowInfo);
*interBytes = dataBytes;
} else {
return TSDB_CODE_TSC_INVALID_SQL;
}
@ -711,13 +711,16 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY en
if (pCtx->aOutputBuf == NULL) {
return BLK_DATA_ALL_NEEDED;
}
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
if (pInfo->hasResult != DATA_SET_FLAG) {
return BLK_DATA_ALL_NEEDED;
} else { // data in current block is not earlier than current result
return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
}
return BLK_DATA_ALL_NEEDED;
// TODO pCtx->aOutputBuf is the previous windowRes output buffer, not current unloaded block. so the following filter
// is invalid
// SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
// if (pInfo->hasResult != DATA_SET_FLAG) {
// return BLK_DATA_ALL_NEEDED;
// } else { // data in current block is not earlier than current result
// return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
// }
}
static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
@ -730,12 +733,16 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end
return BLK_DATA_ALL_NEEDED;
}
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
if (pInfo->hasResult != DATA_SET_FLAG) {
return BLK_DATA_ALL_NEEDED;
} else {
return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
}
return BLK_DATA_ALL_NEEDED;
// TODO pCtx->aOutputBuf is the previous windowRes output buffer, not current unloaded block. so the following filter
// is invalid
// SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
// if (pInfo->hasResult != DATA_SET_FLAG) {
// return BLK_DATA_ALL_NEEDED;
// } else {
// return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
// }
}
//////////////////////////////////////////////////////////////////////////////////////////////
@ -1836,8 +1843,10 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
pInfo1->hasResult = DATA_SET_FLAG;
DO_UPDATE_TAG_COLUMNS(pCtx, pInfo1->ts);
} else {
DO_UPDATE_TAG_COLUMNS(pCtx, pCtx->ptsList[pCtx->size - 1]);
}
SET_VAL(pCtx, pCtx->size, 1);
}

View File

@ -546,6 +546,10 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0;
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
// wait for the callback function to post the semaphore
@ -574,7 +578,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
free(normal->sql);
}
tscFreeSqlObj(pStmt->pSql);
taos_free_result(pStmt->pSql);
free(pStmt);
return TSDB_CODE_SUCCESS;
}

View File

@ -351,7 +351,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_DESCRIBE_TABLE: {
SStrToken* pToken = &pInfo->pDCLInfo->a[0];
const char* msg1 = "invalid table name";
const char* msg2 = "table name is too long";
const char* msg2 = "table name too long";
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
@ -410,7 +410,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char* msg3 = "name too long";
pCmd->command = pInfo->type;
// tDCLSQL* pDCL = pInfo->pDCLInfo;
SUserInfo* pUser = &pInfo->pDCLInfo->user;
SStrToken* pName = &pUser->user;
@ -773,7 +772,7 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql) {
const char* msg1 = "name too long";
const char* msg2 = "current database name is invalid";
const char* msg2 = "current database or database name invalid";
SSqlCmd* pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS;
@ -4874,6 +4873,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
{"cDebugFlag", 10}, {"httpDebugFlag", 13}, {"qDebugflag", 10}, {"sdbDebugFlag", 12},
{"uDebugFlag", 10}, {"tsdbDebugFlag", 13}, {"sDebugflag", 10}, {"rpcDebugFlag", 12},
{"dDebugFlag", 10}, {"mqttDebugFlag", 13}, {"wDebugFlag", 10}, {"tmrDebugFlag", 12},
{"cqDebugFlag", 11},
};
SStrToken* pOptionToken = &pOptions->a[1];
@ -5288,9 +5288,12 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
int32_t tagLength = 0;
size_t size = taosArrayGetSize(pQueryInfo->exprList);
//todo is 0??
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_TAGPRJ || pExpr->functionId == TSDB_FUNC_TAG) {
@ -5302,8 +5305,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
}
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
@ -5311,7 +5313,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
!(pExpr->functionId == TSDB_FUNC_PRJ && TSDB_COL_IS_UD_COL(pExpr->colInfo.flag))) {
SSchema* pColSchema = &pSchema[pExpr->colInfo.colIndex];
getResultDataInfo(pColSchema->type, pColSchema->bytes, pExpr->functionId, (int32_t)pExpr->param[0].i64Key, &pExpr->resType,
&pExpr->resBytes, &pExpr->interBytes, tagLength, true);
&pExpr->resBytes, &pExpr->interBytes, tagLength, isSTable);
}
}
}
@ -5322,7 +5324,7 @@ static int32_t doUpdateSqlFunctionForColPrj(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_PRJ && (!TSDB_COL_IS_UD_COL(pExpr->colInfo.flag))) {
if (pExpr->functionId == TSDB_FUNC_PRJ && (!TSDB_COL_IS_UD_COL(pExpr->colInfo.flag) && (pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX))) {
bool qualifiedCol = false;
for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, j);
@ -5420,13 +5422,6 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
int16_t numOfSelectivity = 0;
int16_t numOfAggregation = 0;
// todo is 0??
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
if (!isSTable) {
return TSDB_CODE_SUCCESS;
}
size_t numOfExprs = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, i);
@ -6303,6 +6298,11 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
}
if (pQueryInfo->interval.interval > 0 && pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') {
bool initialWindows = TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER);
if (initialWindows) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
int64_t timeRange = ABS(pQueryInfo->window.skey - pQueryInfo->window.ekey);
// number of result is not greater than 10,000,000
if ((timeRange == 0) || (timeRange / pQueryInfo->interval.interval) > MAX_INTERVAL_TIME_WINDOW) {

View File

@ -188,8 +188,8 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
if (tscShouldFreeHeartBeat(pHB)) {
tscDebug("%p free HB object and release connection", pHB);
tscFreeSqlObj(pHB);
tscCloseTscObj(pObj);
pObj->pHb = 0;
taos_free_result(pHB);
} else {
int32_t code = tscProcessSql(pHB);
if (code != TSDB_CODE_SUCCESS) {
@ -1959,6 +1959,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
return 0;
}
// TODO multithread problem
static void createHBObj(STscObj* pObj) {
if (pObj->pHb != NULL) {
return;
@ -1987,10 +1988,13 @@ static void createHBObj(STscObj* pObj) {
pSql->pTscObj = pObj;
pSql->signature = pSql;
pObj->pHb = pSql;
T_REF_INC(pObj);
tscAddSubqueryInfo(&pObj->pHb->cmd);
int64_t ad = (int64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &ad, sizeof(int64_t), &pSql, sizeof(int64_t), 2 * 60 * 1000);
T_REF_INC(pObj);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
}
@ -2017,8 +2021,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
pObj->connId = htonl(pConnect->connId);
createHBObj(pObj);
// taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return 0;
}

View File

@ -236,13 +236,21 @@ TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t us
return taos_connect(ipBuf, userBuf, passBuf, dbBuf, port);
}
static void asyncConnCallback(void *param, TAOS_RES *tres, int code) {
SSqlObj *pSql = (SSqlObj *) tres;
assert(pSql != NULL);
pSql->fetchFp(pSql->param, tres, code);
}
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) {
SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, fp, param, taos);
SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, taos);
if (pSql == NULL) {
return NULL;
}
pSql->fetchFp = fp;
pSql->res.code = tscProcessSql(pSql);
tscDebug("%p DB async connection is opening", taos);
return taos;
@ -255,33 +263,17 @@ void taos_close(TAOS *taos) {
return;
}
if (pObj->pHb != NULL) {
if (pObj->pHb->pRpcCtx != NULL) { // wait for rsp from dnode
rpcCancelRequest(pObj->pHb->pRpcCtx);
SSqlObj* pHb = pObj->pHb;
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode
rpcCancelRequest(pHb->pRpcCtx);
pHb->pRpcCtx = NULL;
}
tscSetFreeHeatBeat(pObj);
tscFreeSqlObj(pObj->pHb);
tscDebug("%p, HB is freed", pHb);
taos_free_result(pHb);
}
// free all sqlObjs created by using this connect before free the STscObj
// while(1) {
// pthread_mutex_lock(&pObj->mutex);
// void* p = pObj->sqlList;
// pthread_mutex_unlock(&pObj->mutex);
//
// if (p == NULL) {
// break;
// }
//
// tscDebug("%p waiting for sqlObj to be freed, %p", pObj, p);
// taosMsleep(100);
//
// // todo fix me!! two threads call taos_free_result will cause problem.
// tscDebug("%p free :%p", pObj, p);
// taos_free_result(p);
// }
int32_t ref = T_REF_DEC(pObj);
assert(ref >= 0);

View File

@ -515,6 +515,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return;
}
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
@ -568,6 +572,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pStream->pSql = pSql;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
@ -575,6 +580,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscFreeSqlObj(pSql);
return NULL;
}
strtolower(pSql->sqlstr, sqlstr);
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
@ -615,10 +621,9 @@ void taos_close_stream(TAOS_STREAM *handle) {
tscDebug("%p stream:%p is closed", pSql, pStream);
// notify CQ to release the pStream object
pStream->fp(pStream->param, NULL, NULL);
tscFreeSqlObj(pSql);
pStream->pSql = NULL;
taos_free_result(pSql);
taosTFree(pStream);
}
}

View File

@ -152,6 +152,10 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail;
}
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tsem_wait(&pSub->sem);
@ -173,7 +177,11 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
fail:
tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code));
if (pSql != NULL) {
tscFreeSqlObj(pSql);
if (pSql->self != NULL) {
taos_free_result(pSql);
} else {
tscFreeSqlObj(pSql);
}
pSql = NULL;
}
if (pSub != NULL) {
@ -494,6 +502,10 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
}
}
if (pSub->pSql != NULL) {
taos_free_result(pSub->pSql);
}
tscFreeSqlObj(pSub->pSql);
taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem);

View File

@ -278,7 +278,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscDebug("%p subIndex: %d, no need to launch query, ignore it", pSql, i);
tscDestroyJoinSupporter(pSupporter);
tscFreeSqlObj(pPrevSub);
taos_free_result(pPrevSub);
pSql->pSubs[i] = NULL;
continue;
@ -1383,7 +1383,7 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState
taosTFree(pSupport->localBuffer);
taosTFree(pSupport);
tscFreeSqlObj(pSub);
taos_free_result(pSub);
}
free(pState);

View File

@ -1743,8 +1743,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
}
pNew->pTscObj = pSql->pTscObj;
T_REF_INC(pNew->pTscObj);
pNew->signature = pNew;
SSqlCmd* pCmd = &pNew->cmd;
@ -1777,7 +1775,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
return pNew;

View File

@ -175,6 +175,7 @@ extern int32_t rpcDebugFlag;
extern int32_t odbcDebugFlag;
extern int32_t qDebugFlag;
extern int32_t wDebugFlag;
extern int32_t cqDebugFlag;
extern int32_t debugFlag;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

View File

@ -204,6 +204,7 @@ int32_t debugFlag = 0;
int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 135;
int32_t (*monitorStartSystemFp)() = NULL;
void (*monitorStopSystemFp)() = NULL;
@ -223,12 +224,13 @@ void taosSetAllDebugFlag() {
httpDebugFlag = debugFlag;
mqttDebugFlag = debugFlag;
monitorDebugFlag = debugFlag;
qDebugFlag = debugFlag;
rpcDebugFlag = debugFlag;
uDebugFlag = debugFlag;
sDebugFlag = debugFlag;
wDebugFlag = debugFlag;
tsdbDebugFlag = debugFlag;
qDebugFlag = debugFlag;
cqDebugFlag = debugFlag;
uInfo("all debug flag are set to %d", debugFlag);
}
}
@ -1220,6 +1222,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "cqDebugFlag";
cfg.ptr = &cqDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG;
cfg.minValue = 0;
cfg.maxValue = 255;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "tscEnableRecordSql";
cfg.ptr = &tsTscEnableRecordSql;
cfg.valType = TAOS_CFG_VTYPE_INT32;

@ -1 +1 @@
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
Subproject commit 06ec30a0f1762e8169bf6b9045c82bcaa52bcdf0

View File

@ -1,101 +1,102 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.0</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
<description>TDengine JDBC Driver</description>
<licenses>
<license>
<name>GNU AFFERO GENERAL PUBLIC LICENSE Version 3</name>
<url>https://github.com/taosdata/TDengine/blob/master/LICENSE</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<connection>scm:git:git://github.com/taosdata/TDengine.git</connection>
<developerConnection>scm:git:git@github.com:taosdata/TDengine.git</developerConnection>
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.0</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
<tag>HEAD</tag>
</scm>
<developers>
<developer>
<name>taosdata</name>
<email>support@taosdata.com</email>
<organization>https://www.taosdata.com/</organization>
<organizationUrl>https://www.taosdata.com/</organizationUrl>
</developer>
</developers>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
<commons-logging.version>1.1.2</commons-logging.version>
<commons-lang3.version>3.5</commons-lang3.version>
</properties>
<dependencies>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>${commons-logging.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/assembly-jar.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<encoding>UTF-8</encoding>
<source>${java.version}</source>
<target>${java.version}</target>
<debug>true</debug>
<showDeprecation>true</showDeprecation>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>
</plugins>
</build>
<description>TDengine JDBC Driver</description>
<licenses>
<license>
<name>GNU AFFERO GENERAL PUBLIC LICENSE Version 3</name>
<url>https://github.com/taosdata/TDengine/blob/master/LICENSE</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<connection>scm:git:git://github.com/taosdata/TDengine.git</connection>
<developerConnection>scm:git:git@github.com:taosdata/TDengine.git</developerConnection>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
<tag>HEAD</tag>
</scm>
<developers>
<developer>
<name>taosdata</name>
<email>support@taosdata.com</email>
<organization>https://www.taosdata.com/</organization>
<organizationUrl>https://www.taosdata.com/</organizationUrl>
</developer>
</developers>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
<commons-logging.version>1.1.2</commons-logging.version>
<commons-lang3.version>3.5</commons-lang3.version>
</properties>
<dependencies>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>${commons-logging.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/assembly-jar.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<encoding>UTF-8</encoding>
<source>${java.version}</source>
<target>${java.version}</target>
<debug>true</debug>
<showDeprecation>true</showDeprecation>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -53,66 +53,12 @@ public class TSDBConnection implements Connection {
public TSDBConnection(Properties info, TSDBDatabaseMetaData meta) throws SQLException {
this.dbMetaData = meta;
//load taos.cfg start
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile);
if (!endpoints.isEmpty()) {
info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
}
//load taos.cfg end
connect(info.getProperty(TSDBDriver.PROPERTY_KEY_HOST),
Integer.parseInt(info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "0")),
info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME), info.getProperty(TSDBDriver.PROPERTY_KEY_USER),
info.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
}
private List<String> loadConfigEndpoints(File cfgFile) {
List<String> endpoints = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
String line = null;
while ((line = reader.readLine()) != null) {
if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
endpoints.add(line.substring(line.indexOf('p') + 1).trim());
}
if (endpoints.size() > 1)
break;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return endpoints;
}
/**
* @param cfgDirPath
* @return return the config dir
**/
private File loadConfigDir(String cfgDirPath) {
if (cfgDirPath == null)
return loadDefaultConfigDir();
File cfgDir = new File(cfgDirPath);
if (!cfgDir.exists())
return loadDefaultConfigDir();
return cfgDir;
}
/**
* @return search the default config dir, if the config dir is not exist will return null
*/
private File loadDefaultConfigDir() {
File cfgDir;
File cfgDir_linux = new File("/etc/taos");
cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
File cfgDir_windows = new File("C:\\TDengine\\cfg");
cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir;
return cfgDir;
}
private void connect(String host, int port, String dbName, String user, String password) throws SQLException {
this.connector = new TSDBJNIConnector();
this.connector.connect(host, port, dbName, user, password);

View File

@ -68,15 +68,15 @@ public class TSDBDatabaseMetaData implements java.sql.DatabaseMetaData {
}
public boolean nullsAreSortedLow() throws SQLException {
return false;
return !nullsAreSortedHigh();
}
public boolean nullsAreSortedAtStart() throws SQLException {
return false;
return true;
}
public boolean nullsAreSortedAtEnd() throws SQLException {
return false;
return !nullsAreSortedAtStart();
}
public String getDatabaseProductName() throws SQLException {

View File

@ -14,24 +14,29 @@
*****************************************************************************/
package com.taosdata.jdbc;
import java.io.*;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.logging.Logger;
/**
* The Java SQL framework allows for multiple database drivers. Each driver
* should supply a class that implements the Driver interface
*
*
* <p>
* The DriverManager will try to load as many drivers as it can find and then
* for any given connection request, it will ask each driver in turn to try to
* connect to the target URL.
*
*
* <p>
* It is strongly recommended that each Driver class should be small and stand
* alone so that the Driver class can be loaded and queried without bringing in
* vast quantities of supporting code.
*
*
* <p>
* When a Driver class is loaded, it should create an instance of itself and
* register it with the DriverManager. This means that a user can load and
@ -39,38 +44,41 @@ import java.util.logging.Logger;
*/
public class TSDBDriver implements java.sql.Driver {
@Deprecated
private static final String URL_PREFIX1 = "jdbc:tsdb://";
private static final String URL_PREFIX = "jdbc:taos://";
/**
* Key used to retrieve the database value from the properties instance passed
* to the driver.
*/
public static final String PROPERTY_KEY_DBNAME = "dbname";
@Deprecated
private static final String URL_PREFIX1 = "jdbc:TSDB://";
/**
* Key used to retrieve the host value from the properties instance passed to
* the driver.
*/
public static final String PROPERTY_KEY_HOST = "host";
/**
* Key used to retrieve the password value from the properties instance passed
* to the driver.
*/
public static final String PROPERTY_KEY_PASSWORD = "password";
private static final String URL_PREFIX = "jdbc:TAOS://";
/**
* Key used to retrieve the port number value from the properties instance
* passed to the driver.
*/
public static final String PROPERTY_KEY_PORT = "port";
/**
* Key used to retrieve the database value from the properties instance passed
* to the driver.
*/
public static final String PROPERTY_KEY_DBNAME = "dbname";
/**
* Key used to retrieve the host value from the properties instance passed to
* the driver.
*/
public static final String PROPERTY_KEY_HOST = "host";
/**
* Key used to retrieve the password value from the properties instance passed
* to the driver.
*/
public static final String PROPERTY_KEY_PASSWORD = "password";
/**
* Key used to retrieve the port number value from the properties instance
* passed to the driver.
*/
public static final String PROPERTY_KEY_PORT = "port";
/**
* Key used to retrieve the user value from the properties instance passed to
* the driver.
*/
public static final String PROPERTY_KEY_USER = "user";
/**
* Key used to retrieve the user value from the properties instance passed to
* the driver.
*/
public static final String PROPERTY_KEY_USER = "user";
/**
* Key for the configuration file directory of TSDB client in properties instance
@ -95,278 +103,320 @@ public class TSDBDriver implements java.sql.Driver {
public static final String PROPERTY_KEY_PROTOCOL = "protocol";
/**
* Index for port coming out of parseHostPortPair().
*/
public final static int PORT_NUMBER_INDEX = 1;
/**
* Index for host coming out of parseHostPortPair().
*/
public final static int HOST_NAME_INDEX = 0;
/**
* Index for port coming out of parseHostPortPair().
*/
public final static int PORT_NUMBER_INDEX = 1;
private TSDBDatabaseMetaData dbMetaData = null;
/**
* Index for host coming out of parseHostPortPair().
*/
public final static int HOST_NAME_INDEX = 0;
static {
try {
java.sql.DriverManager.registerDriver(new TSDBDriver());
} catch (SQLException E) {
throw new RuntimeException(TSDBConstants.WrapErrMsg("can't register tdengine jdbc driver!"));
}
}
private TSDBDatabaseMetaData dbMetaData = null;
public Connection connect(String url, Properties info) throws SQLException {
if (url == null) {
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
}
static {
try {
java.sql.DriverManager.registerDriver(new TSDBDriver());
} catch (SQLException E) {
throw new RuntimeException(TSDBConstants.WrapErrMsg("can't register tdengine jdbc driver!"));
}
}
Properties props = null;
private List<String> loadConfigEndpoints(File cfgFile) {
List<String> endpoints = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
String line = null;
while ((line = reader.readLine()) != null) {
if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
endpoints.add(line.substring(line.indexOf('p') + 1).trim());
}
if (endpoints.size() > 1)
break;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return endpoints;
}
if ((props = parseURL(url, info)) == null) {
return null;
}
/**
* @param cfgDirPath
* @return return the config dir
**/
private File loadConfigDir(String cfgDirPath) {
if (cfgDirPath == null)
return loadDefaultConfigDir();
File cfgDir = new File(cfgDirPath);
if (!cfgDir.exists())
return loadDefaultConfigDir();
return cfgDir;
}
try {
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE), (String) props.get(PROPERTY_KEY_CHARSET),
(String) props.get(PROPERTY_KEY_TIME_ZONE));
Connection newConn = new TSDBConnection(props, this.dbMetaData);
return newConn;
} catch (SQLWarning sqlWarning) {
sqlWarning.printStackTrace();
Connection newConn = new TSDBConnection(props, this.dbMetaData);
return newConn;
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception ex) {
SQLException sqlEx = new SQLException("SQLException:" + ex.toString());
sqlEx.initCause(ex);
throw sqlEx;
}
}
/**
* @return search the default config dir, if the config dir is not exist will return null
*/
private File loadDefaultConfigDir() {
File cfgDir;
File cfgDir_linux = new File("/etc/taos");
cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
File cfgDir_windows = new File("C:\\TDengine\\cfg");
cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir;
return cfgDir;
}
/**
* Parses hostPortPair in the form of [host][:port] into an array, with the
* element of index HOST_NAME_INDEX being the host (or null if not specified),
* and the element of index PORT_NUMBER_INDEX being the port (or null if not
* specified).
*
* @param hostPortPair
* host and port in form of of [host][:port]
*
* @return array containing host and port as Strings
*
* @throws SQLException
* if a parse error occurs
*/
protected static String[] parseHostPortPair(String hostPortPair) throws SQLException {
String[] splitValues = new String[2];
public Connection connect(String url, Properties info) throws SQLException {
if (url == null) {
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
}
int portIndex = hostPortPair.indexOf(":");
Properties props = null;
if ((props = parseURL(url, info)) == null) {
return null;
}
String hostname = null;
//load taos.cfg start
if (info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null && info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null){
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile);
if (!endpoints.isEmpty()) {
info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
}
}
if (portIndex != -1) {
if ((portIndex + 1) < hostPortPair.length()) {
String portAsString = hostPortPair.substring(portIndex + 1);
hostname = hostPortPair.substring(0, portIndex);
try {
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE), (String) props.get(PROPERTY_KEY_CHARSET),
(String) props.get(PROPERTY_KEY_TIME_ZONE));
Connection newConn = new TSDBConnection(props, this.dbMetaData);
return newConn;
} catch (SQLWarning sqlWarning) {
sqlWarning.printStackTrace();
Connection newConn = new TSDBConnection(props, this.dbMetaData);
return newConn;
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception ex) {
SQLException sqlEx = new SQLException("SQLException:" + ex.toString());
sqlEx.initCause(ex);
throw sqlEx;
}
}
splitValues[HOST_NAME_INDEX] = hostname;
/**
* Parses hostPortPair in the form of [host][:port] into an array, with the
* element of index HOST_NAME_INDEX being the host (or null if not specified),
* and the element of index PORT_NUMBER_INDEX being the port (or null if not
* specified).
*
* @param hostPortPair host and port in form of of [host][:port]
* @return array containing host and port as Strings
* @throws SQLException if a parse error occurs
*/
protected static String[] parseHostPortPair(String hostPortPair) throws SQLException {
String[] splitValues = new String[2];
splitValues[PORT_NUMBER_INDEX] = portAsString;
} else {
throw new SQLException(TSDBConstants.WrapErrMsg("port is not proper!"));
}
} else {
splitValues[HOST_NAME_INDEX] = hostPortPair;
splitValues[PORT_NUMBER_INDEX] = null;
}
int portIndex = hostPortPair.indexOf(":");
return splitValues;
}
String hostname = null;
public boolean acceptsURL(String url) throws SQLException {
return (url != null && url.length() > 0 && url.trim().length() > 0) && url.toLowerCase().startsWith(URL_PREFIX);
}
if (portIndex != -1) {
if ((portIndex + 1) < hostPortPair.length()) {
String portAsString = hostPortPair.substring(portIndex + 1);
hostname = hostPortPair.substring(0, portIndex);
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
if (info == null) {
info = new Properties();
}
splitValues[HOST_NAME_INDEX] = hostname;
if ((url != null) && (url.startsWith(URL_PREFIX) || url.startsWith(URL_PREFIX1))) {
info = parseURL(url, info);
}
splitValues[PORT_NUMBER_INDEX] = portAsString;
} else {
throw new SQLException(TSDBConstants.WrapErrMsg("port is not proper!"));
}
} else {
splitValues[HOST_NAME_INDEX] = hostPortPair;
splitValues[PORT_NUMBER_INDEX] = null;
}
DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST));
hostProp.required = true;
return splitValues;
}
DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT,
info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
portProp.required = false;
public boolean acceptsURL(String url) throws SQLException {
return (url != null && url.length() > 0 && url.trim().length() > 0) && url.toLowerCase().startsWith(URL_PREFIX);
}
DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME));
dbProp.required = false;
dbProp.description = "Database name";
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
if (info == null) {
info = new Properties();
}
DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER));
userProp.required = true;
if ((url != null) && (url.startsWith(URL_PREFIX) || url.startsWith(URL_PREFIX1))) {
info = parseURL(url, info);
}
DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD,
info.getProperty(PROPERTY_KEY_PASSWORD));
passwordProp.required = true;
DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST));
hostProp.required = true;
DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
propertyInfo[0] = hostProp;
propertyInfo[1] = portProp;
propertyInfo[2] = dbProp;
propertyInfo[3] = userProp;
propertyInfo[4] = passwordProp;
DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT, info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
portProp.required = false;
return propertyInfo;
}
DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME));
dbProp.required = false;
dbProp.description = "Database name";
/**
* example: jdbc:TSDB://127.0.0.1:0/db?user=root&password=your_password
*/
DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER));
userProp.required = true;
public Properties parseURL(String url, Properties defaults) throws java.sql.SQLException {
Properties urlProps = (defaults != null) ? defaults : new Properties();
if (url == null) {
return null;
}
DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD, info.getProperty(PROPERTY_KEY_PASSWORD));
passwordProp.required = true;
DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
propertyInfo[0] = hostProp;
propertyInfo[1] = portProp;
propertyInfo[2] = dbProp;
propertyInfo[3] = userProp;
propertyInfo[4] = passwordProp;
return propertyInfo;
}
/**
* example: jdbc:TSDB://127.0.0.1:0/db?user=root&password=your_password
*/
public Properties parseURL(String url, Properties defaults) throws java.sql.SQLException {
Properties urlProps = (defaults != null) ? defaults : new Properties();
if (url == null) {
return null;
}
String lowerUrl = url.toLowerCase();
if (!lowerUrl.startsWith(URL_PREFIX) && !lowerUrl.startsWith(URL_PREFIX1)) {
return null;
}
return null;
}
String urlForMeta = url;
String urlForMeta = url;
String dbProductName = url.substring(url.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
int beginningOfSlashes = url.indexOf("//");
url = url.substring(beginningOfSlashes + 2);
String dbProductName = url.substring(url.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
int beginningOfSlashes = url.indexOf("//");
url = url.substring(beginningOfSlashes + 2);
String host = url.substring(0, url.indexOf(":"));
url = url.substring(url.indexOf(":") + 1);
urlProps.setProperty(PROPERTY_KEY_HOST, host);
String host = url.substring(0, url.indexOf(":"));
url = url.substring(url.indexOf(":") + 1);
urlProps.setProperty(PROPERTY_KEY_HOST, host);
String port = url.substring(0, url.indexOf("/"));
urlProps.setProperty(PROPERTY_KEY_PORT, port);
url = url.substring(url.indexOf("/") + 1);
String port = url.substring(0, url.indexOf("/"));
urlProps.setProperty(PROPERTY_KEY_PORT, port);
url = url.substring(url.indexOf("/") + 1);
if (url.indexOf("?") != -1) {
String dbName = url.substring(0, url.indexOf("?"));
urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
url = url.trim().substring(url.indexOf("?") + 1);
} else {
// without user & password so return
if(!url.trim().isEmpty()) {
String dbName = url.trim();
urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
}
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty("user"));
return urlProps;
}
if (url.indexOf("?") != -1) {
String dbName = url.substring(0, url.indexOf("?"));
urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
url = url.trim().substring(url.indexOf("?") + 1);
} else {
// without user & password so return
if (!url.trim().isEmpty()) {
String dbName = url.trim();
urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
}
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty("user"));
return urlProps;
}
String user = "";
String user = "";
if (url.indexOf("&") == -1) {
String[] kvPair = url.trim().split("=");
if (kvPair.length == 2) {
setPropertyValue(urlProps, kvPair);
return urlProps;
}
}
if (url.indexOf("&") == -1) {
String[] kvPair = url.trim().split("=");
if (kvPair.length == 2) {
setPropertyValue(urlProps, kvPair);
return urlProps;
}
}
String[] queryStrings = url.trim().split("&");
for (String queryStr : queryStrings) {
String[] kvPair = queryStr.trim().split("=");
if (kvPair.length < 2){
continue;
}
setPropertyValue(urlProps, kvPair);
}
String[] queryStrings = url.trim().split("&");
for (String queryStr : queryStrings) {
String[] kvPair = queryStr.trim().split("=");
if (kvPair.length < 2) {
continue;
}
setPropertyValue(urlProps, kvPair);
}
user = urlProps.getProperty(PROPERTY_KEY_USER).toString();
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user);
user = urlProps.getProperty(PROPERTY_KEY_USER).toString();
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user);
return urlProps;
}
return urlProps;
}
public void setPropertyValue(Properties property, String[] keyValuePair) {
switch (keyValuePair[0].toLowerCase()) {
case PROPERTY_KEY_USER:
property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]);
break;
case PROPERTY_KEY_PASSWORD:
property.setProperty(PROPERTY_KEY_PASSWORD, keyValuePair[1]);
break;
case PROPERTY_KEY_TIME_ZONE:
property.setProperty(PROPERTY_KEY_TIME_ZONE, keyValuePair[1]);
break;
case PROPERTY_KEY_LOCALE:
property.setProperty(PROPERTY_KEY_LOCALE, keyValuePair[1]);
break;
case PROPERTY_KEY_CHARSET:
property.setProperty(PROPERTY_KEY_CHARSET, keyValuePair[1]);
break;
case PROPERTY_KEY_CONFIG_DIR:
property.setProperty(PROPERTY_KEY_CONFIG_DIR, keyValuePair[1]);
break;
}
}
public void setPropertyValue(Properties property, String[] keyValuePair) {
switch (keyValuePair[0].toLowerCase()) {
case PROPERTY_KEY_USER:
property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]);
break;
case PROPERTY_KEY_PASSWORD:
property.setProperty(PROPERTY_KEY_PASSWORD, keyValuePair[1]);
break;
case PROPERTY_KEY_TIME_ZONE:
property.setProperty(PROPERTY_KEY_TIME_ZONE, keyValuePair[1]);
break;
case PROPERTY_KEY_LOCALE:
property.setProperty(PROPERTY_KEY_LOCALE, keyValuePair[1]);
break;
case PROPERTY_KEY_CHARSET:
property.setProperty(PROPERTY_KEY_CHARSET, keyValuePair[1]);
break;
case PROPERTY_KEY_CONFIG_DIR:
property.setProperty(PROPERTY_KEY_CONFIG_DIR, keyValuePair[1]);
break;
}
}
public int getMajorVersion() {
return 1;
}
public int getMajorVersion() {
return 1;
}
public int getMinorVersion() {
return 1;
}
public int getMinorVersion() {
return 1;
}
public boolean jdbcCompliant() {
return false;
}
public boolean jdbcCompliant() {
return false;
}
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
/**
* Returns the host property
*
* @param props
* the java.util.Properties instance to retrieve the hostname from.
*
* @return the host
*/
public String host(Properties props) {
return props.getProperty(PROPERTY_KEY_HOST, "localhost");
}
/**
* Returns the host property
*
* @param props the java.util.Properties instance to retrieve the hostname from.
* @return the host
*/
public String host(Properties props) {
return props.getProperty(PROPERTY_KEY_HOST, "localhost");
}
/**
* Returns the port number property
*
* @param props
* the properties to get the port number from
*
* @return the port number
*/
public int port(Properties props) {
return Integer.parseInt(props.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
}
/**
* Returns the port number property
*
* @param props the properties to get the port number from
* @return the port number
*/
public int port(Properties props) {
return Integer.parseInt(props.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
}
/**
* Returns the database property from <code>props</code>
*
* @param props
* the Properties to look for the database property.
*
* @return the database name.
*/
public String database(Properties props) {
return props.getProperty(PROPERTY_KEY_DBNAME);
}
/**
* Returns the database property from <code>props</code>
*
* @param props the Properties to look for the database property.
* @return the database name.
*/
public String database(Properties props) {
return props.getProperty(PROPERTY_KEY_DBNAME);
}
}

View File

@ -242,7 +242,7 @@ public class TSDBStatement implements Statement {
public void addBatch(String sql) throws SQLException {
if (batchedArgs == null) {
batchedArgs = new ArrayList<String>();
batchedArgs = new ArrayList<>();
}
batchedArgs.add(sql);
}

View File

@ -2,6 +2,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
IF (TD_LINUX)

View File

@ -21,6 +21,7 @@
#include <string.h>
#include "taos.h"
#include "tsclient.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "ttimer.h"
@ -30,10 +31,12 @@
#include "tlog.h"
#include "twal.h"
#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__); }}
#define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__); }}
#define cFatal(...) { if (cqDebugFlag & DEBUG_FATAL) { taosPrintLog("CQ FATAL ", 255, __VA_ARGS__); }}
#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("CQ ERROR ", 255, __VA_ARGS__); }}
#define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("CQ WARN ", 255, __VA_ARGS__); }}
#define cInfo(...) { if (cqDebugFlag & DEBUG_INFO) { taosPrintLog("CQ ", 255, __VA_ARGS__); }}
#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
#define cPrint(...) { taosPrintLog("CQ ", 255, __VA_ARGS__); }
typedef struct {
int vgId;
@ -63,8 +66,6 @@ typedef struct SCqObj {
SCqContext * pContext;
} SCqObj;
int cqDebugFlag = 135;
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
@ -94,7 +95,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
pthread_mutex_init(&pContext->mutex, NULL);
cTrace("vgId:%d, CQ is opened", pContext->vgId);
cInfo("vgId:%d, CQ is opened", pContext->vgId);
return pContext;
}
@ -125,7 +126,7 @@ void cqClose(void *handle) {
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
cTrace("vgId:%d, CQ is closed", pContext->vgId);
cInfo("vgId:%d, CQ is closed", pContext->vgId);
free(pContext);
}
@ -133,7 +134,7 @@ void cqStart(void *handle) {
SCqContext *pContext = handle;
if (pContext->dbConn || pContext->master) return;
cTrace("vgId:%d, start all CQs", pContext->vgId);
cInfo("vgId:%d, start all CQs", pContext->vgId);
pthread_mutex_lock(&pContext->mutex);
pContext->master = 1;
@ -149,7 +150,7 @@ void cqStart(void *handle) {
void cqStop(void *handle) {
SCqContext *pContext = handle;
cTrace("vgId:%d, stop all CQs", pContext->vgId);
cInfo("vgId:%d, stop all CQs", pContext->vgId);
if (pContext->dbConn == NULL || pContext->master == 0) return;
pthread_mutex_lock(&pContext->mutex);
@ -160,7 +161,7 @@ void cqStop(void *handle) {
if (pObj->pStream) {
taos_close_stream(pObj->pStream);
pObj->pStream = NULL;
cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
cInfo("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
taosTmrStop(pObj->tmrId);
pObj->tmrId = 0;
@ -188,7 +189,7 @@ void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSch
pObj->pSchema = tdDupSchema(pSchema);
pObj->rowSize = schemaTLen(pSchema);
cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
cInfo("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
pthread_mutex_lock(&pContext->mutex);
@ -228,7 +229,7 @@ void cqDrop(void *handle) {
pObj->tmrId = 0;
}
cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema);
free(pObj->sqlStr);
free(pObj);
@ -236,24 +237,31 @@ void cqDrop(void *handle) {
pthread_mutex_unlock(&pContext->mutex);
}
static void doCreateStream(void *param, TAOS_RES *result, int code) {
SCqObj* pObj = (SCqObj*)param;
SCqContext* pContext = pObj->pContext;
SSqlObj* pSql = (SSqlObj*)result;
pContext->dbConn = pSql->pTscObj;
cqCreateStream(pContext, pObj);
}
static void cqProcessCreateTimer(void *param, void *tmrId) {
SCqObj* pObj = (SCqObj*)param;
SCqContext* pContext = pObj->pContext;
if (pContext->dbConn == NULL) {
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0);
if (pContext->dbConn == NULL) {
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
}
cDebug("vgId:%d, try connect to TDengine", pContext->vgId);
taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL);
} else {
cqCreateStream(pContext, pObj);
}
cqCreateStream(pContext, pObj);
}
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pContext = pContext;
if (pContext->dbConn == NULL) {
cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId);
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl);
return;
}
@ -262,7 +270,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
if (pObj->pStream) {
pContext->num++;
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
}
@ -278,7 +286,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
STSchema *pSchema = pObj->pSchema;
if (pObj->pStream == NULL) return;
cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
char *buffer = calloc(size, 1);

View File

@ -245,7 +245,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN 256
#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 8mb
#define TSDB_MAX_BYTES_PER_ROW 16384
#define TSDB_MAX_TAGS_LEN 16384

View File

@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 5, false, mnodeFreeShowObj, "show");
tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 5, true, mnodeFreeShowObj, "show");
return 0;
}
@ -389,10 +389,12 @@ static bool mnodeAccquireShowObj(SShowObj *pShow) {
}
static void* mnodePutShowObj(SShowObj *pShow) {
const int32_t DEFAULT_SHOWHANDLE_LIFE_SPAN = tsShellActivityTimer * 6 * 1000;
if (tsMnodeShowCache != NULL) {
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
uint64_t handleVal = (uint64_t)pShow;
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(int64_t), &pShow, sizeof(int64_t), 6000);
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(int64_t), &pShow, sizeof(int64_t), DEFAULT_SHOWHANDLE_LIFE_SPAN);
pShow->ppShow = (void**)ppShow;
mDebug("%p, show is put into cache, data:%p index:%d", pShow, ppShow, pShow->index);
return pShow;

View File

@ -1384,6 +1384,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
}
pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 5;
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
mnodeDecDbRef(pDb);
return numOfRows;
@ -2122,8 +2125,8 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
}
pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId);
mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->uid);
mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved, vgId:%d sid:%d", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->uid, pTable->vgId, pTable->sid);
return TSDB_CODE_SUCCESS;
}

View File

@ -230,4 +230,5 @@ void httpCloseContextByServer(HttpContext *pContext) {
pContext->parsed = false;
httpRemoveContextFromEpoll(pContext);
httpReleaseContext(pContext, true);
}

View File

@ -257,20 +257,20 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo
HttpEncodeMethod *encode = pContext->encodeMethod;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd,
pContext->user, pContext->session->taos, tstrerror(code), (SSqlObj *)result);
httpError("context:%p, fd:%d, user:%s, query error, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd,
pContext->user, tstrerror(code), (SSqlObj *)result);
return;
}
if (code < 0) {
SSqlObj *pObj = (SSqlObj *)result;
if (code == TSDB_CODE_TSC_INVALID_SQL) {
httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s, sqlObj:%p, error:%s", pContext,
pContext->fd, pContext->user, pContext->session->taos, tstrerror(code), pObj, pObj->cmd.payload);
httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext,
pContext->fd, pContext->user, tstrerror(code), pObj, pObj->cmd.payload);
httpSendTaosdInvalidSqlErrorResp(pContext, pObj->cmd.payload);
} else {
httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s, sqlObj:%p", pContext, pContext->fd,
pContext->user, pContext->session->taos, tstrerror(code), pObj);
httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p", pContext, pContext->fd,
pContext->user, tstrerror(code), pObj);
httpSendErrorResp(pContext, code);
}
taos_free_result(result);

View File

@ -73,12 +73,11 @@ typedef struct SDiskbasedResultBuf {
bool comp; // compressed before flushed to disk
int32_t nextPos; // next page flush position
const void* handle; // for debug purpose
const void* handle; // for debug purpose
SResultBufStatis statis;
} SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (4096L)
#define DEFAULT_INMEM_BUF_PAGES 10
#define DEFAULT_INTERN_BUF_PAGE_SIZE (256L) // in bytes
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
/**

View File

@ -39,7 +39,6 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf
}
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_TIMEWINDOW(_winresInfo, _win) (STimeWindow) {(_win)->skey, ((_win)->skey + (_winresInfo)->interval - 1)}
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);

View File

@ -108,7 +108,7 @@ extern "C" {
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define MAX_INTERVAL_TIME_WINDOW 10000000
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define TOP_BOTTOM_QUERY_LIMIT 100
enum {

View File

@ -187,7 +187,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData,
SDataStatis *pStatis, void *param, int32_t colIndex);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo);
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery);
static void buildTagQueryResult(SQInfo *pQInfo);
@ -782,6 +782,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool hasPrev = pCtx[0].preAggVals.isSet;
if (IS_MASTER_SCAN(pRuntimeEnv) || closed) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
@ -794,11 +796,17 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed
}
// not a whole block involved in query processing, statistics data can not be used
pCtx[k].preAggVals.isSet = (forwardStep == numOfTotal);
// NOTE: the original value of isSet have been changed here
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false;
}
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
// restore it
pCtx[k].preAggVals.isSet = hasPrev;
}
}
}
@ -1975,8 +1983,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
// todo handle the case the the order irrelevant query type mixed up with order critical query type
// descending order query for last_row query
if (isFirstLastRowQuery(pQuery)) {
qDebug("QInfo:%p scan order changed for last_row query, old:%d, new:%d", GET_QINFO_ADDR(pQuery),
pQuery->order.order, TSDB_ORDER_ASC);
qDebug("QInfo:%p scan order changed for last_row query, old:%d, new:%d", pQInfo, pQuery->order.order, TSDB_ORDER_ASC);
pQuery->order.order = TSDB_ORDER_ASC;
if (pQuery->window.skey > pQuery->window.ekey) {
@ -2078,13 +2085,14 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t MIN_ROWS_PER_PAGE = 4;
*rowsize = (int32_t)(pQuery->rowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery));
int32_t overhead = sizeof(tFilePage);
// one page contains at least two rows
*ps = DEFAULT_INTERN_BUF_PAGE_SIZE;
while(((*rowsize) * 2) > (*ps) - overhead) {
while(((*rowsize) * MIN_ROWS_PER_PAGE) > (*ps) - overhead) {
*ps = (*ps << 1u);
}
@ -2112,7 +2120,7 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
}
}
// no statistics data
// no statistics data, load the true data block
if (index == -1) {
return true;
}
@ -2122,8 +2130,17 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
return true;
}
// all points in current column are NULL, no need to check its boundary value
// all data in current column are NULL, no need to check its boundary value
if (pDataStatis[index].numOfNull == numOfRows) {
// if isNULL query exists, load the null data column
for (int32_t j = 0; j < pFilterInfo->numOfFilters; ++j) {
SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j];
if (pFilterElem->fp == isNull_filter) {
return true;
}
}
continue;
}
@ -2949,11 +2966,10 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
STableQueryInfo *item = taosArrayGetP(pGroup, i);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
pageList = list;
tid = TSDB_TABLEID(item->pTable)->tid;
if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) {
pTableList[numOfTables++] = item;
tid = TSDB_TABLEID(item->pTable)->tid;
pageList = list;
}
}
@ -3707,7 +3723,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
return pTableQueryInfo;
}
void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo) {
void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
}
@ -4383,6 +4399,8 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
return true;
}
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
@ -4422,16 +4440,20 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
// update the query time window
pQuery->window = cond.twindow;
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray *group = GET_TABLEGROUP(pQInfo, i);
if (pQInfo->tableGroupInfo.numOfTables == 0) {
pQInfo->tableqinfoGroupInfo.numOfTables = 0;
} else {
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray *group = GET_TABLEGROUP(pQInfo, i);
size_t t = taosArrayGetSize(group);
for (int32_t j = 0; j < t; ++j) {
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
size_t t = taosArrayGetSize(group);
for (int32_t j = 0; j < t; ++j) {
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
pCheckInfo->win = pQuery->window;
pCheckInfo->lastKey = pCheckInfo->win.skey;
pCheckInfo->win = pQuery->window;
pCheckInfo->lastKey = pCheckInfo->win.skey;
}
}
}
} else if (isPointInterpoQuery(pQuery)) {
@ -6317,17 +6339,43 @@ _error:
}
static void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
if (pFilter == NULL) {
if (pFilter == NULL || numOfFilters == 0) {
return;
}
for (int32_t i = 0; i < numOfFilters; i++) {
if (pFilter[i].filterstr) {
free((void*)(pFilter[i].pz));
}
}
free(pFilter);
}
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) {
if (pTableqinfoGroupInfo->pGroupList != NULL) {
int32_t numOfGroups = (int32_t) taosArrayGetSize(pTableqinfoGroupInfo->pGroupList);
for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = taosArrayGetP(pTableqinfoGroupInfo->pGroupList, i);
size_t num = taosArrayGetSize(p);
for(int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(p, j);
destroyTableQueryInfoImpl(item);
}
taosArrayDestroy(p);
}
}
taosArrayDestroy(pTableqinfoGroupInfo->pGroupList);
taosHashCleanup(pTableqinfoGroupInfo->map);
pTableqinfoGroupInfo->pGroupList = NULL;
pTableqinfoGroupInfo->map = NULL;
pTableqinfoGroupInfo->numOfTables = 0;
}
static void freeQInfo(SQInfo *pQInfo) {
if (!isValidQInfo(pQInfo)) {
return;
@ -6388,25 +6436,9 @@ static void freeQInfo(SQInfo *pQInfo) {
taosTFree(pQuery);
}
// todo refactor, extract method to destroytableDataInfo
if (pQInfo->tableqinfoGroupInfo.pGroupList != NULL) {
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = GET_TABLEGROUP(pQInfo, i);
size_t num = taosArrayGetSize(p);
for(int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(p, j);
destroyTableQueryInfo(item);
}
taosArrayDestroy(p);
}
}
doDestroyTableQueryInfo(&pQInfo->tableqinfoGroupInfo);
taosTFree(pQInfo->pBuf);
taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList);
taosHashCleanup(pQInfo->tableqinfoGroupInfo.map);
tsdbDestroyTableGroup(&pQInfo->tableGroupInfo);
taosArrayDestroy(pQInfo->arrTableIdInfo);

View File

@ -38,6 +38,9 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
pFillInfo->numOfTags = numOfTags;
pFillInfo->numOfCols = numOfCols;
pFillInfo->precision = precision;
pFillInfo->interval.interval = slidingTime;
pFillInfo->interval.intervalUnit = slidingUnit;
pFillInfo->interval.sliding = slidingTime;
pFillInfo->interval.slidingUnit = slidingUnit;

View File

@ -580,6 +580,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle;
pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
if (pConn->chandle == NULL) {
tError("failed to connect to:0x%x:%d", pConn->peerIp, pConn->peerPort);
terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcCloseConn(pConn);
pConn = NULL;

View File

@ -511,9 +511,9 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
sInfo("%s, it is configured", pPeer->id);
int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
sDebug("%s, start to check peer connection", pPeer->id);
int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
if (pNode->vgId) checkMs = tsStatusInterval * 2000 + 100;
if (pNode->vgId > 1) checkMs = tsStatusInterval * 2000 + checkMs;
sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs);
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer);
}

View File

@ -2114,6 +2114,8 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
totalNumOfTable++;
taosArrayPush(pGroup, &keyInfo);
} else {
taosArrayDestroy(pGroup);
taosArrayRemove(groupList->pGroupList, j);
numOfGroups -= 1;
j -= 1;
@ -2736,4 +2738,5 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
}
taosArrayDestroy(pGroupList->pGroupList);
pGroupList->numOfTables = 0;
}

View File

@ -260,7 +260,12 @@ static void incRefFn(void* ptNode) {
}
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0 || pCacheObj->deleting == 1) {
if (pCacheObj == NULL || pCacheObj->deleting == 1) {
return NULL;
}
if (taosHashGetSize(pCacheObj->pHashTable) == 0) {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
return NULL;
}

View File

@ -58,7 +58,8 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return TSDB_CODE_APP_NOT_READY;
// TODO: Later, let slave to support query
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_APP_NOT_READY;
}

View File

@ -53,7 +53,7 @@ function buildTDengine {
function runGeneralCaseOneByOne {
while read -r line; do
if [[ $line =~ ^./test.sh* ]]; then
case=`echo $line | grep -w "general\|unique\/mnode\/mgmt33.sim\|unique\/stable\/dnode3.sim\|unique\/cluster\/balance3.sim\|unique\/arbitrator\/offline_replica2_alterTable_online.sim"|awk '{print $NF}'`
case=`echo $line | grep sim$ |awk '{print $NF}'`
if [ -n "$case" ]; then
./test.sh -f $case > /dev/null 2>&1 && \

View File

@ -0,0 +1,96 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import sys
sys.path.insert(0, os.getcwd())
from util.log import *
from util.sql import *
from util.dnodes import *
import taos
class TwoClients:
def initConnection(self):
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/home/xp/git/TDengine/sim/dnode1/cfg"
def run(self):
tdDnodes.init("")
tdDnodes.setTestCluster(False)
tdDnodes.setValgrind(False)
tdDnodes.stopAll()
tdDnodes.deploy(1)
tdDnodes.start(1)
# first client create a stable and insert data
conn1 = taos.connect(self.host, self.user, self.password, self.config)
cursor1 = conn1.cursor()
cursor1.execute("drop database if exists db")
cursor1.execute("create database db")
cursor1.execute("use db")
cursor1.execute("create table tb (ts timestamp, id int) tags(loc nchar(30))")
cursor1.execute("insert into t0 using tb tags('beijing') values(now, 1)")
# second client alter the table created by cleint
conn2 = taos.connect(self.host, self.user, self.password, self.config)
cursor2 = conn2.cursor()
cursor2.execute("use db")
cursor2.execute("alter table tb add column name nchar(30)")
# first client should not be able to use the origin metadata
tdSql.init(cursor1, True)
tdSql.error("insert into t0 values(now, 2)")
# first client should be able to insert data with udpated medadata
tdSql.execute("insert into t0 values(now, 2, 'test')")
tdSql.query("select * from tb")
tdSql.checkRows(2)
# second client drop the table
cursor2.execute("drop table t0")
cursor2.execute("create table t0 using tb tags('beijing')")
tdSql.execute("insert into t0 values(now, 2, 'test')")
tdSql.query("select * from tb")
tdSql.checkRows(1)
# error expected for two clients drop the same cloumn
cursor2.execute("alter table tb drop column name")
tdSql.error("alter table tb drop column name")
cursor2.execute("alter table tb add column speed int")
tdSql.error("alter table tb add column speed int")
tdSql.execute("alter table tb add column size int")
tdSql.query("describe tb")
tdSql.checkRows(5)
tdSql.checkData(0, 0, "ts")
tdSql.checkData(1, 0, "id")
tdSql.checkData(2, 0, "speed")
tdSql.checkData(3, 0, "size")
tdSql.checkData(4, 0, "loc")
cursor1.close()
cursor2.close()
conn1.close()
conn2.close()
clients = TwoClients()
clients.initConnection()
clients.run()

View File

@ -16,6 +16,7 @@ python3 ./test.py -f insert/nchar.py
python3 ./test.py -f insert/nchar-unicode.py
python3 ./test.py -f insert/multi.py
python3 ./test.py -f insert/randomNullCommit.py
python3 insert/retentionpolicy.py
python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py
@ -187,6 +188,13 @@ python3 ./test.py -f functions/function_top.py
#python3 ./test.py -f functions/function_twa.py
python3 queryCount.py
python3 ./test.py -f query/queryGroupbyWithInterval.py
python3 client/twoClients.py
python3 test.py -f query/queryInterval.py
# tools
python3 test.py -f tools/taosdemo.py
# subscribe
python3 test.py -f subscribe/singlemeter.py
python3 test.py -f subscribe/stability.py
python3 test.py -f subscribe/supertable.py

View File

@ -0,0 +1,108 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
import datetime
sys.path.insert(0, os.getcwd())
import taos
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestRetetion:
def init(self):
self.queryRows=0
tdLog.debug("start to execute %s" % __file__)
tdLog.info("prepare cluster")
tdDnodes.init("")
tdDnodes.setTestCluster(False)
tdDnodes.setValgrind(False)
tdDnodes.stopAll()
tdDnodes.deploy(1)
tdDnodes.start(1)
print(tdDnodes.getDnodesRootDir())
self.conn = taos.connect(config=tdDnodes.getSimCfgPath())
tdSql.init(self.conn.cursor())
tdSql.execute('reset query cache')
def checkRows(self, expectRows,sql):
if self.queryRows == expectRows:
tdLog.info("sql:%s, queryRows:%d == expect:%d" % (sql, self.queryRows, expectRows))
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, self.queryRows, expectRows)
os.system("timedatectl set-ntp true")
tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
def run(self):
tdLog.info("=============== step1")
tdSql.execute('create database test keep 3 days 1;')
tdSql.execute('use test;')
tdSql.execute('create table test(ts timestamp,i int);')
cmd = 'insert into test values(now-2d,11)(now-1d,11)(now,11)(now+1d,11);'
tdLog.info(cmd)
tdSql.execute(cmd)
tdSql.query('select * from test')
tdSql.checkRows(4)
tdLog.info("=============== step2")
tdDnodes.stop(1)
os.system("timedatectl set-ntp false")
os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
tdDnodes.start(1)
cmd = 'insert into test values(now,11);'
tdLog.info(cmd)
tdSql.execute(cmd)
tdSql.query('select * from test')
tdSql.checkRows(5)
tdLog.info("=============== step3")
tdDnodes.stop(1)
os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
tdDnodes.start(1)
cmd = 'insert into test values(now-1d,11);'
tdLog.info(cmd)
tdSql.execute(cmd)
tdSql.query('select * from test')
tdSql.checkRows(6)
tdLog.info("=============== step4")
tdDnodes.stop(1)
tdDnodes.start(1)
cmd = 'insert into test values(now,11);'
tdLog.info(cmd)
tdSql.execute(cmd)
tdSql.query('select * from test')
tdSql.checkRows(7)
tdLog.info("=============== step5")
tdDnodes.stop(1)
tdDnodes.start(1)
cmd='select * from test where ts > now-1d'
queryRows=tdSql.query('select * from test where ts > now-1d')
self.checkRows(1,cmd)
def stop(self):
os.system("timedatectl set-ntp true")
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
clients = TDTestRetetion()
clients.init()
clients.run()
clients.stop()

View File

@ -0,0 +1,62 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1538548685000
def run(self):
tdSql.prepare()
tdSql.execute("create table st (ts timestamp, voltage int) tags (loc nchar(30))")
tdSql.execute("insert into t0 using st tags('beijing') values(now, 220) (now - 15d, 221) (now - 30d, 225) (now - 35d, 228) (now - 45d, 222)")
tdSql.execute("insert into t1 using st tags('shanghai') values(now, 220) (now - 60d, 221) (now - 50d, 225) (now - 40d, 228) (now - 20d, 222)")
tdSql.query("select avg(voltage) from st interval(1n)")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 223.0)
tdSql.checkData(1, 1, 225.0)
tdSql.checkData(2, 1, 220.333333)
tdSql.query("select avg(voltage) from st interval(1n, 15d)")
tdSql.checkRows(3)
tdSql.checkData(0, 1, 224.8)
tdSql.checkData(1, 1, 222.666666)
tdSql.checkData(2, 1, 220.0)
tdSql.query("select avg(voltage) from st interval(1n, 15d) group by loc")
tdSql.checkRows(6)
tdSql.checkData(0, 1, 225.0)
tdSql.checkData(1, 1, 223.0)
tdSql.checkData(2, 1, 220.0)
tdSql.checkData(3, 1, 224.666666)
tdSql.checkData(4, 1, 222.0)
tdSql.checkData(5, 1, 220.0)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -123,8 +123,8 @@ class TDSql:
def checkData(self, row, col, data):
self.checkRowCol(row, col)
if self.queryResult[row][col] != data:
if str(self.queryResult[row][col]) != str(data):
if self.queryResult[row][col] != data:
if str(self.queryResult[row][col]) == str(data):
tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data))
return

View File

@ -3,7 +3,7 @@ sleep 3000
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 0
system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode1 -c maxSQLLength -v 7340032
system sh/cfg.sh -n dnode1 -c maxSQLLength -v 340032
system sh/exec.sh -n dnode1 -s start
sleep 3000

View File

@ -850,6 +850,8 @@ if $rows != 12 then
return -1
endi
print =====================>td-1442
sql_error select count(*) from m_fl_tb0 interval(1s) fill(prev);
print =============== clear
sql drop database $db

View File

@ -65,22 +65,23 @@ endi
if $data00 != @18-09-18 01:40:00.000@ then
return -1
endi
#if $data01 != NULL then
if $data01 != 999 then
return -1
endi
#if $data02 != NULL then
endi
if $data02 != 999 then
return -1
endi
#if $data03 != NULL then
endi
if $data03 != 999.00000 then
return -1
endi
#if $data04 != NULL then
if $data04 != 999.000000000 then
return -1
endi
#if $data05 != NULL then
if $data05 != 999 then
return -1
@ -127,7 +128,7 @@ if $data01 != 0 then
return -1
endi
#add check for out of range first/last query
print =============> add check for out of range first/last query
sql select first(ts),last(ts) from first_tb4 where ts>'2018-9-18 1:40:01';
if $row != 0 then
return -1
@ -136,4 +137,130 @@ endi
sql select first(ts),last(ts) from first_tb4 where ts<'2018-9-17 8:50:0';
if $row != 0 then
return -1
endi
#first/last mix up query
#select first(size),last(size) from stest interval(1d) group by tbname;
print =====================>td-1477
sql create table stest(ts timestamp,size INT,filenum INT) tags (appname binary(500),tenant binary(500));
sql insert into test1 using stest tags('test1','aaa') values ('2020-09-04 16:53:54.003',210,3);
sql insert into test2 using stest tags('test1','aaa') values ('2020-09-04 16:53:56.003',210,3);
sql insert into test11 using stest tags('test11','bbb') values ('2020-09-04 16:53:57.003',210,3);
sql insert into test12 using stest tags('test11','bbb') values ('2020-09-04 16:53:58.003',210,3);
sql insert into test21 using stest tags('test21','ccc') values ('2020-09-04 16:53:59.003',210,3);
sql insert into test22 using stest tags('test21','ccc') values ('2020-09-04 16:54:54.003',210,3);
sql select sum(size) from stest group by appname;
if $rows != 3 then
return -1
endi
if $data00 != 420 then
return -1
endi
if $data10 != 420 then
return -1
endi
if $data20 != 420 then
return -1
endi
if $data01 != @test1@ then
return -1
endi
if $data11 != @test11@ then
return -1
endi
if $data21 != @test21@ then
return -1
endi
sql select sum(size) from stest interval(1d) group by appname;
if $rows != 3 then
return -1
endi
#2020-09-04 00:00:00.000 | 420 | test1 |
#2020-09-04 00:00:00.000 | 420 | test11 |
#2020-09-04 00:00:00.000 | 420 | test21 |
if $data00 != @20-09-04 00:00:00.000@ then
return -1
endi
if $data10 != @20-09-04 00:00:00.000@ then
return -1
endi
if $data20 != @20-09-04 00:00:00.000@ then
return -1
endi
if $data01 != 420 then
print expect 420 , actual $data01
return -1
endi
if $data11 != 420 then
return -1
endi
if $data21 != 420 then
return -1
endi
if $data02 != @test1@ then
return -1
endi
if $data12 != @test11@ then
return -1
endi
if $data22 != @test21@ then
return -1
endi
print ===================>td-1477, one table has only one block occurs this bug.
sql select first(size),count(*),LAST(SIZE) from stest where tbname in ('test1', 'test2') interval(1d) group by tbname;
if $rows != 2 then
return -1
endi
if $data00 != @20-09-04 00:00:00.000@ then
return -1
endi
if $data01 != 210 then
return -1
endi
if $data02 != 1 then
return -1
endi
if $data03 != 210 then
return -1
endi
if $data04 != @test1@ then
return -1
endi
if $data10 != @20-09-04 00:00:00.000@ then
return -1
endi
if $data11 != 210 then
return -1
endi
if $data12 != 1 then
return -1
endi
if $data13 != 210 then
return -1
endi
if $data14 != @test2@ then
print expect test2 , actual: $data14
return -1
endi

View File

@ -218,4 +218,10 @@ endi
if $data04 != 123.981000000 then
print expect 123.981000000, actual: $data04
return -1
endi
sql create table tu(ts timestamp, k int)
sql select last_row(*) from tu
if $row != 0 then
return -1
endi

View File

@ -308,13 +308,25 @@ sleep 2000
system sh/exec.sh -n dnode1 -s start
sql select * from wh_mt0 where c3 = 'abc' and tbname in ('test_null_filter');
sql_error select * from wh_mt0 where c3 = 'abc' and tbname in ('test_null_filter');
sql select * from wh_mt0 where c3 = '1' and tbname in ('test_null_filter');
if $row != 0 then
return -1
endi
sql select * from wh_mt0 where c3 = 'abc' and tbname in ('test_null_filter');
if $row != 0 then
sql select * from wh_mt0 where c3 = '1';
if $row == 0 then
return -1
endi
sql select * from wh_mt0 where c3 is null and tbname in ('test_null_filter');
if $rows != 10000 then
return -1
endi
sql select * from wh_mt0 where c3 is not null and tbname in ('test_null_filter');
if $rows != 0 then
return -1
endi

View File

@ -128,6 +128,7 @@ echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG
echo "sdebugFlag 135" >> $TAOS_CFG
echo "wdebugFlag 135" >> $TAOS_CFG
echo "cqdebugFlag 135" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG
echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG

View File

@ -34,11 +34,11 @@ system sh/cfg.sh -n dnode4 -c http -v 1
return
# for crash_gen
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 10
system sh/cfg.sh -n dnode1 -c rpcMaxTime -v 101
system sh/cfg.sh -n dnode1 -c cache -v 2
system sh/cfg.sh -n dnode1 -c keep -v 36500
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c walLevel -v 1
# for windows

View File

@ -42,9 +42,11 @@ $count = 2
while $count < 102
$db = d . $count
$tb = $db . .t
$tb2 = $db . .t2
sql create database $db replica 3 cache 1 blocks 3
sql create table $tb (ts timestamp, i int)
sql insert into $tb values(now, 1)
sql create table $tb2 as select count(*) from $tb interval(10s)
$count = $count + 1
print insert into $tb values(now, 1) ==> finished
endw
@ -74,7 +76,7 @@ print ============================== step6
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
sleep 3000
sleep 10000
print ============================== step7

View File

@ -9,8 +9,9 @@ NC='\033[0m'
function runSimCaseOneByOne {
while read -r line; do
if [[ $line =~ ^run.* ]]; then
case=`echo $line | awk '{print $NF}'`
if [[ $line =~ ^./test.sh* ]]; then
case=`echo $line | grep sim$ |awk '{print $NF}'`
start_time=`date +%s`
./test.sh -f $case > /dev/null 2>&1 && \
echo -e "${GREEN}$case success${NC}" | tee -a out.log || \
@ -54,7 +55,7 @@ if [ "$2" != "python" ]; then
runSimCaseOneByOne regressionSuite.sim
elif [ "$1" == "full" ]; then
echo "### run TSIM full test ###"
runSimCaseOneByOne fullGeneralSuite.sim
runSimCaseOneByOne jenkins/basic.txt
elif [ "$1" == "smoke" ] || [ -z "$1" ]; then
echo "### run TSIM smoke test ###"
runSimCaseOneByOne basicSuite.sim