Merge remote-tracking branch 'origin/develop' into feature/crash_gen

This commit is contained in:
Steven Li 2020-09-24 07:08:33 +00:00
commit d1e2f0209c
71 changed files with 3104 additions and 1826 deletions

View File

@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.0.3.0")
SET(TD_VER_NUMBER "2.0.4.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)

View File

@ -1019,5 +1019,5 @@ SELECT AVG(current),MAX(current),LEASTSQUARES(current, start_val, step_val), PER
- 表名最大长度为193每行数据最大长度16k个字符
- 列名最大长度为65最多允许1024列最少需要2列第一列必须是时间戳
- 标签最多允许128个可以0个标签总长度不超过16k个字符
- SQL语句最大长度65480个字符但可通过系统配置参数maxSQLLength修改最长可配置为8M
- SQL语句最大长度65480个字符但可通过系统配置参数maxSQLLength修改最长可配置为1M
- 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制

View File

@ -67,7 +67,7 @@ TDengine 分布式架构的逻辑结构图如下:
<center> 图 1 TDengine架构示意图 </center>
一个完整的 TDengine 系统是运行在一到多个物理节点上的,逻辑上,它包含数据节点(dnode)、TDengine客户端(taosc)以及应用(app)。系统中存在一到多个数据节点,这些数据节点组成一个集群(cluster)。应用通过taosc的API与TDengine集群进行互动。下面对每个逻辑单元进行简要介绍。
**物理节点(pnode):** pnode是一独立运行、拥有自己的计算、存储和网络能力的计算机可以是安装有OS的物理机、虚拟机或容器。物理节点由其配置的 FQDN(Fully Qualified Domain Name)来标识。
**物理节点(pnode):** pnode是一独立运行、拥有自己的计算、存储和网络能力的计算机可以是安装有OS的物理机、虚拟机或容器。物理节点由其配置的 FQDN(Fully Qualified Domain Name)来标识。TDengine完全依赖FQDN来进行网络通讯如果不了解FQDN请看博文《[一篇文章说清楚TDengine的FQDN](https://www.taosdata.com/blog/2020/09/11/1824.html)》。
**数据节点(dnode):** dnode 是 TDengine 服务器侧执行代码 taosd 在物理节点上的一个运行实例一个工作的系统必须有至少一个数据节点。dnode包含零到多个逻辑的虚拟节点(VNODE),零或者至多一个逻辑的管理节点(mnode)。dnode在系统中的唯一标识由实例的End Point (EP )决定。EP是dnode所在物理节点的FQDN (Fully Qualified Domain Name)和系统所配置的网络端口号(Port)的组合。通过配置不同的端口,一个物理节点(一台物理机、虚拟机或容器)可以运行多个实例,或有多个数据节点。

View File

@ -22,7 +22,7 @@ TDengine的集群管理极其简单除添加和删除节点需要人工干预
1. 每个物理节点上执行命令`hostname -f`查看和确认所有节点的hostname是不相同的(应用驱动所在节点无需做此项检查)
2. 每个物理节点上执行`ping host`, 其中host是其他物理节点的hostname, 看能否ping通其它物理节点; 如果不能ping通需要检查网络设置, 或/etc/hosts文件(Windows系统默认路径为C:\Windows\system32\drivers\etc\hosts)或DNS的配置。如果无法ping通是无法组成集群的
3. 从应用运行的物理节点ping taosd运行的数据节点如果无法应用是无法连接taosd的请检查应用所在物理节点的DNS设置或hosts文件
3. 从应用运行的物理节点ping taosd运行的数据节点如果无法ping应用是无法连接taosd的请检查应用所在物理节点的DNS设置或hosts文件
4. 每个数据节点的End Point就是输出的hostname外加端口号比如h1.taosdata.com:6030
**第五步**修改TDengine的配置文件所有节点的文件/etc/taos/taos.cfg都需要修改。假设准备启动的第一个数据节点End Point为 h1.taosdata.com:6030, 其与集群配置相关参数如下:
@ -31,8 +31,8 @@ TDengine的集群管理极其简单除添加和删除节点需要人工干预
// firstEp 是每个数据节点首次启动后连接的第一个数据节点
firstEp h1.taosdata.com:6030
// 配置本数据节点的FQDN如果本机只有一个hostname, 无需配置
fqdn h1.taosdata.com
// 必须配置本数据节点的FQDN如果本机只有一个hostname, 可注释掉本配置
fqdn h1.taosdata.com
// 配置本数据节点的端口号缺省是6030
serverPort 6030
@ -41,7 +41,7 @@ serverPort 6030
arbitrator ha.taosdata.com:6042
```
一定要修改的参数是firstEp和fqdn, 其他参数可不做任何修改,除非你很清楚为什么要修改。
一定要修改的参数是firstEp和fqdn。在每个数据节点firstEp需全部配置成一样**但fqdn一定要配置成其所在数据节点的值**。其他参数可不做任何修改,除非你很清楚为什么要修改。
**加入到集群中的数据节点dnode涉及集群相关的下表11项参数必须完全相同否则不能成功加入到集群中。**

View File

@ -25,6 +25,7 @@ INSERT INTO d1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6,
- 要提高写入效率需要批量写入。一批写入的记录条数越多插入效率就越高。但一条记录不能超过16K一条SQL语句总长度不能超过64K可通过参数maxSQLLength配置最大可配置为8M
- TDengine支持多线程同时写入要进一步提高写入速度一个客户端需要打开20个以上的线程同时写。但线程数达到一定数量后无法再提高甚至还会下降因为线程切频繁切换带来额外开销。
- 对同一张表,如果新插入记录的时间戳已经存在,新记录将被直接抛弃,也就是说,在一张表里,时间戳必须是唯一的。如果应用自动生成记录,很有可能生成的时间戳是一样的,这样,成功插入的记录条数会小于应用插入的记录条数。
- 写入的数据的时间戳必须大于当前时间减去配置参数keep的时间。如果keep配置为3650天那么无法写入比3650天还老的数据。写入数据的时间戳也不能大于当前时间加配置参数days。如果days配置为2那么无法写入比当前时间还晚2天的数据。
## Prometheus直接写入
[Prometheus](https://www.prometheus.io/)作为Cloud Native Computing Fundation毕业的项目在性能监控以及K8S性能监控领域有着非常广泛的应用。TDengine提供一个小工具[Bailongma](https://github.com/taosdata/Bailongma)只需在Prometheus做简单配置无需任何代码就可将Prometheus采集的数据直接写入TDengine并按规则在TDengine自动创建库和相关表项。博文[用Docker容器快速搭建一个Devops监控Demo](https://www.taosdata.com/blog/2020/02/03/1189.html)即是采用bailongma将Prometheus和Telegraf的数据写入TDengine中的示例可以参考。

View File

@ -191,6 +191,7 @@ SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
void tscColumnListDestroy(SArray* pColList);
void tscDequoteAndTrimToken(SStrToken* pToken);
int32_t tscValidateName(SStrToken* pToken);
void tscIncStreamExecutionCount(void* pStream);
@ -257,6 +258,8 @@ void tscDoQuery(SSqlObj* pSql);
*/
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd);
void registerSqlObj(SSqlObj* pSql);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);

View File

@ -89,7 +89,7 @@ typedef struct STableComInfo {
typedef struct SCMCorVgroupInfo {
int32_t version;
int8_t inUse;
int8_t inUse;
int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SCMCorVgroupInfo;
@ -99,6 +99,7 @@ typedef struct STableMeta {
uint8_t tableType;
int16_t sversion;
int16_t tversion;
char sTableId[TSDB_TABLE_FNAME_LEN];
SCMVgroupInfo vgroupInfo;
SCMCorVgroupInfo corVgroupInfo;
STableId id;
@ -106,7 +107,7 @@ typedef struct STableMeta {
} STableMeta;
typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquired by name
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>

View File

@ -51,10 +51,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->fp = fp;
pSql->fetchFp = fp;
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
registerSqlObj(pSql);
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {

View File

@ -326,7 +326,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*interBytes = dataBytes + sizeof(SLastrowInfo);
*interBytes = dataBytes;
} else {
return TSDB_CODE_TSC_INVALID_SQL;
}
@ -1843,8 +1843,10 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
pInfo1->hasResult = DATA_SET_FLAG;
DO_UPDATE_TAG_COLUMNS(pCtx, pInfo1->ts);
} else {
DO_UPDATE_TAG_COLUMNS(pCtx, pCtx->ptsList[pCtx->size - 1]);
}
SET_VAL(pCtx, pCtx->size, 1);
}

View File

@ -23,7 +23,30 @@
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "taos.h"
#include "tscSubquery.h"
#define STR_NOCASE_EQUAL(str1, len1, str2, len2) ((len1 == len2) && 0 == strncasecmp(str1, str2, len1))
typedef enum BuildType {
SCREATE_BUILD_TABLE = 1,
SCREATE_BUILD_DB = 2,
} BuildType;
typedef enum Stage {
SCREATE_CALLBACK_QUERY = 1,
SCREATE_CALLBACK_RETRIEVE = 2,
} Stage;
// support 'show create table'
typedef struct SCreateBuilder {
char sTableName[TSDB_TABLE_FNAME_LEN];
char buf[TSDB_TABLE_FNAME_LEN];
SSqlObj *pParentSql;
SSqlObj *pInterSql;
int32_t (*fp)(void *para, char* result);
Stage callStage;
} SCreateBuilder;
static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength);
static int32_t getToStringLength(const char *pData, int32_t length, int32_t type) {
@ -272,14 +295,511 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
tscFieldInfoUpdateOffset(pQueryInfo);
return tscSetValueToResObj(pSql, rowLen);
}
static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengths, int idx, char *result) {
const char *val = row[idx];
if (val == NULL) {
sprintf(result, "%s", TSDB_DATA_NULL_STR);
return -1;
}
uint8_t type = fields[idx].type;
int32_t length = lengths[idx];
switch (type) {
case TSDB_DATA_TYPE_BOOL:
sprintf(result, "%s", ((((int)(*((char *)val))) == 1) ? "true" : "false"));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(result, "%d", (int)(*((char *)val)));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(result, "%d", (int)(*((short *)val)));
break;
case TSDB_DATA_TYPE_INT:
sprintf(result, "%d", *((int *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(result, "%"PRId64, *((int64_t *)val));
break;
case TSDB_DATA_TYPE_FLOAT:
sprintf(result, "%f", GET_FLOAT_VAL(val));
break;
case TSDB_DATA_TYPE_DOUBLE:
sprintf(result, "%f", GET_DOUBLE_VAL(val));
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY:
memcpy(result, val, length);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
///formatTimestamp(buf, *(int64_t*)val, TSDB_TIME_PRECISION_MICRO);
//memcpy(result, val, strlen(buf));
sprintf(result, "%"PRId64, *((int64_t *)val));
break;
default:
break;
}
return 0;
}
void tscSCreateCallBack(void *param, TAOS_RES *tres, int code) {
if (param == NULL || tres == NULL) {
return;
}
SCreateBuilder *builder = (SCreateBuilder *)(param);
SSqlObj *pParentSql = builder->pParentSql;
SSqlObj *pSql = (SSqlObj *)tres;
SSqlRes *pRes = &pParentSql->res;
pRes->code = taos_errno(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) {
taos_free_result(pSql);
free(builder);
tscQueueAsyncRes(pParentSql);
return;
}
if (builder->callStage == SCREATE_CALLBACK_QUERY) {
taos_fetch_rows_a(tres, tscSCreateCallBack, param);
builder->callStage = SCREATE_CALLBACK_RETRIEVE;
} else {
char *result = calloc(1, TSDB_MAX_BINARY_LEN);
pRes->code = builder->fp(builder, result);
taos_free_result(pSql);
free(builder);
free(result);
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, code);
} else {
tscQueueAsyncRes(pParentSql);
}
}
}
TAOS_ROW tscFetchRow(void *param) {
SCreateBuilder *builder = (SCreateBuilder *)param;
if (builder == NULL) {
return NULL;
}
SSqlObj *pSql = builder->pInterSql;
if (pSql == NULL || pSql->signature != pSql) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 ||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->command == TSDB_SQL_INSERT) {
return NULL;
}
// set the sql object owner
tscSetSqlOwner(pSql);
// current data set are exhausted, fetch more data from node
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE ||
pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE ||
pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
pCmd->command == TSDB_SQL_SERV_STATUS ||
pCmd->command == TSDB_SQL_CURRENT_DB ||
pCmd->command == TSDB_SQL_SERV_VERSION ||
pCmd->command == TSDB_SQL_CLI_VERSION ||
pCmd->command == TSDB_SQL_CURRENT_USER )) {
taos_fetch_rows_a(pSql, tscSCreateCallBack, param);
return NULL;
}
void* data = doSetResultRowData(pSql, true);
tscClearSqlOwner(pSql);
return data;
}
static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) {
TAOS_ROW row = tscFetchRow(builder);
SSqlObj* pSql = builder->pInterSql;
if (row == NULL) {
return TSDB_CODE_MND_INVALID_TABLE_NAME;
}
int32_t* lengths = taos_fetch_lengths(pSql);
int num_fields = taos_num_fields(pSql);
TAOS_FIELD *fields = taos_fetch_fields(pSql);
char buf[TSDB_COL_NAME_LEN + 16];
for (int i = 0; i < num_fields; i++) {
memset(buf, 0, sizeof(buf));
int32_t ret = tscGetNthFieldResult(row, fields, lengths, i, buf);
if (i == 0) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s", "(");
}
if ((fields[i].type == TSDB_DATA_TYPE_NCHAR
|| fields[i].type == TSDB_DATA_TYPE_BINARY
|| fields[i].type == TSDB_DATA_TYPE_TIMESTAMP) && 0 == ret) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "\"%s\",", buf);
} else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s,", buf);
}
if (i == num_fields - 1) {
sprintf(result + strlen(result) - 1, "%s", ")");
}
}
if (0 == strlen(result)) {
return TSDB_CODE_MND_INVALID_TABLE_NAME;
}
return TSDB_CODE_SUCCESS;
}
// build 'show create table/database' result fields
static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const char *ddl) {
int32_t rowLen = 0;
int16_t ddlLen = (int16_t)strlen(ddl);
SColumnIndex index = {0};
pSql->cmd.numOfCols = 2;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f;
if (type == SCREATE_BUILD_TABLE) {
f.type = TSDB_DATA_TYPE_BINARY;
f.bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
tstrncpy(f.name, "Table", sizeof(f.name));
} else {
f.type = TSDB_DATA_TYPE_BINARY;
f.bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
tstrncpy(f.name, "Database", sizeof(f.name));
}
SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
f.bytes, f.bytes - VARSTR_HEADER_SIZE, false);
rowLen += f.bytes;
f.bytes = (int16_t)(ddlLen + VARSTR_HEADER_SIZE);
f.type = TSDB_DATA_TYPE_BINARY;
if (type == SCREATE_BUILD_TABLE) {
tstrncpy(f.name, "Create Table", sizeof(f.name));
} else {
tstrncpy(f.name, "Create Database", sizeof(f.name));
}
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(int16_t)(ddlLen + VARSTR_HEADER_SIZE), ddlLen, false);
rowLen += ddlLen + VARSTR_HEADER_SIZE;
return rowLen;
}
static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const char *tableName, const char *ddl) {
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
int32_t numOfRows = 1;
if (strlen(ddl) == 0) {
}
tscInitResObjForLocalQuery(pSql, numOfRows, rowLen);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
char* dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(dst, tableName, pField->bytes);
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(dst, ddl, pField->bytes);
return 0;
}
static int32_t tscSCreateBuildResult(SSqlObj *pSql, BuildType type, const char *str, const char *result) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
int32_t rowLen = tscSCreateBuildResultFields(pSql, type, result);
tscFieldInfoUpdateOffset(pQueryInfo);
return tscSCreateSetValueToResObj(pSql, rowLen, str, result);
}
int32_t tscRebuildCreateTableStatement(void *param,char *result) {
SCreateBuilder *builder = (SCreateBuilder *)param;
int32_t code = TSDB_CODE_SUCCESS;
char *buf = calloc(1,TSDB_MAX_BINARY_LEN);
if (buf == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
code = tscGetTableTagValue(builder, buf);
if (code == TSDB_CODE_SUCCESS) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "CREATE TABLE %s USING %s TAGS %s", builder->buf, builder->sTableName, buf);
code = tscSCreateBuildResult(builder->pParentSql, SCREATE_BUILD_TABLE, builder->buf, result);
}
free(buf);
return code;
}
static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) {
TAOS_ROW row = tscFetchRow(builder);
if (row == NULL) {
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
const char *showColumns[] = {"REPLICA", "QUORUM", "DAYS", "KEEP", "BLOCKS", NULL};
SSqlObj *pSql = builder->pInterSql;
TAOS_FIELD *fields = taos_fetch_fields(pSql);
int num_fields = taos_num_fields(pSql);
char buf[TSDB_DB_NAME_LEN + 64] = {0};
do {
int32_t* lengths = taos_fetch_lengths(pSql);
int32_t ret = tscGetNthFieldResult(row, fields, lengths, 0, buf);
if (0 == ret && STR_NOCASE_EQUAL(buf, strlen(buf), builder->buf, strlen(builder->buf))) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "CREATE DATABASE %s", buf);
for (int i = 1; i < num_fields; i++) {
for (int j = 0; showColumns[j] != NULL; j++) {
if (STR_NOCASE_EQUAL(fields[i].name, strlen(fields[i].name), showColumns[j], strlen(showColumns[j]))) {
memset(buf, 0, sizeof(buf));
ret = tscGetNthFieldResult(row, fields, lengths, i, buf);
if (ret == 0) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), " %s %s", showColumns[j], buf);
}
}
}
}
break;
}
row = tscFetchRow(builder);
} while (row != NULL);
if (0 == strlen(result)) {
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
return TSDB_CODE_SUCCESS;
}
int32_t tscRebuildCreateDBStatement(void *param,char *result) {
SCreateBuilder *builder = (SCreateBuilder *)param;
int32_t code = TSDB_CODE_SUCCESS;
char *buf = calloc(1, TSDB_MAX_BINARY_LEN);
if (buf == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
code = tscGetDBInfo(param, buf);
if (code == TSDB_CODE_SUCCESS) {
code = tscSCreateBuildResult(builder->pParentSql, SCREATE_BUILD_DB, builder->buf, buf);
}
free(buf);
return code;
}
static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
char *buf = (char *)malloc(TSDB_MAX_BINARY_LEN);
if (buf == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
buf[0] = 0;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta;
if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_STREAM_TABLE) {
free(buf);
return TSDB_CODE_TSC_INVALID_VALUE;
}
SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
int32_t numOfTags = tscGetNumOfTags(pMeta);
for (int32_t i = 0; i < numOfTags; i++) {
if (i != numOfTags - 1) {
snprintf(buf + strlen(buf), TSDB_MAX_BINARY_LEN - strlen(buf), "%s,", pTagsSchema[i].name);
} else {
snprintf(buf + strlen(buf), TSDB_MAX_BINARY_LEN - strlen(buf), "%s", pTagsSchema[i].name);
}
}
*result = buf;
return TSDB_CODE_SUCCESS;
}
static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pInterSql == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SCreateBuilder *param = (SCreateBuilder *)malloc(sizeof(SCreateBuilder));
if (param == NULL) {
free(pInterSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
char fullName[TSDB_TABLE_FNAME_LEN] = {0};
extractDBName(pTableMetaInfo->name, fullName);
extractTableName(pMeta->sTableId, param->sTableName);
snprintf(fullName + strlen(fullName), TSDB_TABLE_FNAME_LEN - strlen(fullName), ".%s", param->sTableName);
extractTableName(pTableMetaInfo->name, param->buf);
param->pParentSql = pSql;
param->pInterSql = pInterSql;
param->fp = tscRebuildCreateTableStatement;
param->callStage = SCREATE_CALLBACK_QUERY;
char *query = (char *)calloc(1, TSDB_MAX_BINARY_LEN);
if (query == NULL) {
free(param);
free(pInterSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
char *columns = NULL;
int32_t code = tscGetTableTagColumnName(pSql, &columns) ;
if (code != TSDB_CODE_SUCCESS) {
free(param);
free(pInterSql);
free(query);
return code;
}
snprintf(query + strlen(query), TSDB_MAX_BINARY_LEN - strlen(query), "SELECT %s FROM %s WHERE TBNAME IN(\'%s\')", columns, fullName, param->buf);
doAsyncQuery(pSql->pTscObj, pInterSql, tscSCreateCallBack, param, query, strlen(query));
free(query);
free(columns);
return TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
int32_t numOfRows = tscGetNumOfColumns(pMeta);
SSchema *pSchema = tscGetTableSchema(pMeta);
char *result = ddl;
sprintf(result, "create table %s (", tableName);
for (int32_t i = 0; i < numOfRows; ++i) {
uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes);
} else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[pSchema[i].type].aName);
}
}
sprintf(result + strlen(result) - 1, "%s", ")");
return TSDB_CODE_SUCCESS;
}
static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, char *ddl) {
char *result = ddl;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
int32_t numOfRows = tscGetNumOfColumns(pMeta);
int32_t totalRows = numOfRows + tscGetNumOfTags(pMeta);
SSchema *pSchema = tscGetTableSchema(pMeta);
sprintf(result, "create table %s (", tableName);
for (int32_t i = 0; i < numOfRows; ++i) {
uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result),"%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes);
} else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[type].aName);
}
}
snprintf(result + strlen(result) - 1, TSDB_MAX_BINARY_LEN - strlen(result), "%s %s", ")", "TAGS (");
for (int32_t i = numOfRows; i < totalRows; i++) {
uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes);
} else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[type].aName);
}
}
sprintf(result + strlen(result) - 1, "%s", ")");
return TSDB_CODE_SUCCESS;
}
static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pTableMetaInfo->pTableMeta != NULL);
char tableName[TSDB_TABLE_NAME_LEN] = {0};
extractTableName(pTableMetaInfo->name, tableName);
char *result = (char *)calloc(1, TSDB_MAX_BINARY_LEN);
int32_t code = TSDB_CODE_SUCCESS;
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscRebuildDDLForSuperTable(pSql, tableName, result);
} else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
code = tscRebuildDDLForNormalTable(pSql, tableName, result);
} else if (UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) {
code = tscRebuildDDLForSubTable(pSql, tableName, result);
} else {
code = TSDB_CODE_TSC_INVALID_VALUE;
}
if (code == TSDB_CODE_SUCCESS) {
code = tscSCreateBuildResult(pSql, SCREATE_BUILD_TABLE, tableName, result);
}
free(result);
return code;
}
static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pInterSql == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SCreateBuilder *param = (SCreateBuilder *)malloc(sizeof(SCreateBuilder));
if (param == NULL) {
free(pInterSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
extractTableName(pTableMetaInfo->name, param->buf);
param->pParentSql = pSql;
param->pInterSql = pInterSql;
param->fp = tscRebuildCreateDBStatement;
param->callStage = SCREATE_CALLBACK_QUERY;
const char *query = "show databases";
doAsyncQuery(pSql->pTscObj, pInterSql, tscSCreateCallBack, param, query, strlen(query));
return TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
static int32_t tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resBytes = TSDB_USER_LEN + TSDB_DATA_TYPE_BINARY;
pExpr->resType = TSDB_DATA_TYPE_BINARY;
char* vx = calloc(1, pExpr->resBytes);
if (vx == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
@ -287,7 +807,7 @@ static int32_t tscProcessCurrentUser(SSqlObj *pSql) {
size_t size = sizeof(pSql->pTscObj->user);
STR_WITH_MAXSIZE_TO_VARSTR(vx, pSql->pTscObj->user, size);
tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes);
free(vx);
@ -297,15 +817,15 @@ static int32_t tscProcessCurrentUser(SSqlObj *pSql) {
static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
char db[TSDB_DB_NAME_LEN] = {0};
extractDBName(pSql->pTscObj->db, db);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
size_t t = strlen(db);
pExpr->resBytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE;
char* vx = calloc(1, pExpr->resBytes);
if (vx == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
@ -316,7 +836,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
} else {
STR_WITH_SIZE_TO_VARSTR(vx, db, (VarDataLenT)t);
}
tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes);
free(vx);
@ -326,49 +846,53 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
static int32_t tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
size_t t = strlen(v);
pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE);
char* vx = calloc(1, pExpr->resBytes);
if (vx == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
STR_WITH_SIZE_TO_VARSTR(vx, v, (VarDataLenT)t);
tscSetLocalQueryResult(pSql, vx, pExpr->aliasName, pExpr->resType, pExpr->resBytes);
free(vx);
return TSDB_CODE_SUCCESS;
}
static int32_t tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
size_t t = strlen(version);
pExpr->resBytes = (int16_t)(t + VARSTR_HEADER_SIZE);
char* v = calloc(1, pExpr->resBytes);
if (v == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
STR_WITH_SIZE_TO_VARSTR(v, version, (VarDataLenT)t);
tscSetLocalQueryResult(pSql, v, pExpr->aliasName, pExpr->resType, pExpr->resBytes);
free(v);
return TSDB_CODE_SUCCESS;
}
static int32_t tscProcessServStatus(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj;
if (pObj->pHb != NULL) {
if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
@ -379,9 +903,9 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
return pSql->res.code;
}
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
int32_t val = 1;
tscSetLocalQueryResult(pSql, (char*) &val, pExpr->aliasName, TSDB_DATA_TYPE_INT, sizeof(int32_t));
@ -393,23 +917,23 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
SSqlRes *pRes = &pSql->res;
pCmd->numOfCols = 1;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pQueryInfo->order.order = TSDB_ORDER_ASC;
tscFieldInfoClear(&pQueryInfo->fieldsInfo);
pQueryInfo->fieldsInfo.pFields = taosArrayInit(1, sizeof(TAOS_FIELD));
pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(1, sizeof(SFieldSupInfo));
TAOS_FIELD f = tscCreateField((int8_t)type, columnName, (int16_t)valueLength);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
tscInitResObjForLocalQuery(pSql, 1, (int32_t)valueLength);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, 0);
pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprList, 0);
memcpy(pRes->data, val, pField->bytes);
}
@ -428,6 +952,10 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
*/
pRes->qhandle = 0x1;
pRes->numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE) {
pRes->code = tscProcessShowCreateTable(pSql);
} else if (pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE) {
pRes->code = tscProcessShowCreateDatabase(pSql);
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosCacheEmpty(tscMetaCache);
pRes->code = TSDB_CODE_SUCCESS;
@ -447,12 +975,13 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
}
// keep the code in local variable in order to avoid invalid read in case of async query
int32_t code = pRes->code;
if (code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS){
} else {
tscQueueAsyncRes(pSql);
}
return code;
}

View File

@ -1192,8 +1192,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
str += index;
if (TK_STRING == sToken.type) {
strdequote(sToken.z);
sToken.n = (uint32_t)strtrim(sToken.z);
tscDequoteAndTrimToken(&sToken);
}
if (sToken.type == TK_RP) {

View File

@ -545,7 +545,9 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0;
registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
// wait for the callback function to post the semaphore
@ -574,7 +576,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
free(normal->sql);
}
tscFreeSqlObj(pStmt->pSql);
taos_free_result(pStmt->pSql);
free(pStmt);
return TSDB_CODE_SUCCESS;
}

View File

@ -368,7 +368,40 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return tscGetTableMeta(pSql, pTableMetaInfo);
}
case TSDB_SQL_SHOW_CREATE_TABLE: {
SStrToken* pToken = &pInfo->pDCLInfo->a[0];
const char* msg1 = "invalid table name";
const char* msg2 = "table name is too long";
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (!tscValidateTableNameLength(pToken->n)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
return tscGetTableMeta(pSql, pTableMetaInfo);
}
case TSDB_SQL_SHOW_CREATE_DATABASE: {
const char* msg1 = "invalid database name";
const char* msg2 = "table name is too long";
SStrToken* pToken = &pInfo->pDCLInfo->a[0];
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pToken->n > TSDB_DB_NAME_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
return TSDB_CODE_SUCCESS;
}
case TSDB_SQL_CFG_DNODE: {
const char* msg2 = "invalid configure options or values, such as resetlog / debugFlag 135 / balance 'vnode:2-dnode:2' / monitor 1 ";
const char* msg3 = "invalid dnode ep";
@ -4873,6 +4906,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
{"cDebugFlag", 10}, {"httpDebugFlag", 13}, {"qDebugflag", 10}, {"sdbDebugFlag", 12},
{"uDebugFlag", 10}, {"tsdbDebugFlag", 13}, {"sDebugflag", 10}, {"rpcDebugFlag", 12},
{"dDebugFlag", 10}, {"mqttDebugFlag", 13}, {"wDebugFlag", 10}, {"tmrDebugFlag", 12},
{"cqDebugFlag", 11},
};
SStrToken* pOptionToken = &pOptions->a[1];
@ -5286,9 +5320,12 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
int32_t tagLength = 0;
size_t size = taosArrayGetSize(pQueryInfo->exprList);
//todo is 0??
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_TAGPRJ || pExpr->functionId == TSDB_FUNC_TAG) {
@ -5300,8 +5337,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
}
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
@ -5309,7 +5345,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
!(pExpr->functionId == TSDB_FUNC_PRJ && TSDB_COL_IS_UD_COL(pExpr->colInfo.flag))) {
SSchema* pColSchema = &pSchema[pExpr->colInfo.colIndex];
getResultDataInfo(pColSchema->type, pColSchema->bytes, pExpr->functionId, (int32_t)pExpr->param[0].i64Key, &pExpr->resType,
&pExpr->resBytes, &pExpr->interBytes, tagLength, true);
&pExpr->resBytes, &pExpr->interBytes, tagLength, isSTable);
}
}
}
@ -5320,7 +5356,7 @@ static int32_t doUpdateSqlFunctionForColPrj(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_PRJ && (!TSDB_COL_IS_UD_COL(pExpr->colInfo.flag))) {
if (pExpr->functionId == TSDB_FUNC_PRJ && (!TSDB_COL_IS_UD_COL(pExpr->colInfo.flag) && (pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX))) {
bool qualifiedCol = false;
for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, j);
@ -5418,13 +5454,6 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
int16_t numOfSelectivity = 0;
int16_t numOfAggregation = 0;
// todo is 0??
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
if (!isSTable) {
return TSDB_CODE_SUCCESS;
}
size_t numOfExprs = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, i);
@ -5726,7 +5755,7 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
char msg[512] = {0};
if (pCreate->walLevel != -1 && (pCreate->walLevel < TSDB_MIN_WAL_LEVEL || pCreate->walLevel > TSDB_MAX_WAL_LEVEL)) {
snprintf(msg, tListLen(msg), "invalid db option walLevel: %d, only 0-2 allowed", pCreate->walLevel);
snprintf(msg, tListLen(msg), "invalid db option walLevel: %d, only 1-2 allowed", pCreate->walLevel);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}

View File

@ -170,6 +170,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->sversion = pTableMetaMsg->sversion;
pTableMeta->tversion = pTableMetaMsg->tversion;
tstrncpy(pTableMeta->sTableId, pTableMetaMsg->sTableId, TSDB_TABLE_FNAME_LEN);
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);

View File

@ -128,6 +128,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
taosCorEndWrite(&pVgroupInfo->version);
}
void tscPrintMgmtEp() {
SRpcEpSet dump;
tscDumpMgmtEpSet(&dump);
@ -188,8 +189,8 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
if (tscShouldFreeHeartBeat(pHB)) {
tscDebug("%p free HB object and release connection", pHB);
tscFreeSqlObj(pHB);
tscCloseTscObj(pObj);
pObj->pHb = 0;
taos_free_result(pHB);
} else {
int32_t code = tscProcessSql(pHB);
if (code != TSDB_CODE_SUCCESS) {
@ -745,7 +746,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
/* column id is not valid according to the cached table meta, the table meta is expired */
tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
@ -1959,6 +1959,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
return 0;
}
// TODO multithread problem
static void createHBObj(STscObj* pObj) {
if (pObj->pHb != NULL) {
return;
@ -1986,12 +1987,11 @@ static void createHBObj(STscObj* pObj) {
pSql->param = pObj;
pSql->pTscObj = pObj;
pSql->signature = pSql;
registerSqlObj(pSql);
tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
pObj->pHb = pSql;
T_REF_INC(pObj);
tscAddSubqueryInfo(&pObj->pHb->cmd);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
}
int tscProcessConnectRsp(SSqlObj *pSql) {
@ -2017,8 +2017,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
pObj->connId = htonl(pConnect->connId);
createHBObj(pObj);
// taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return 0;
}
@ -2090,6 +2089,9 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
UNUSED(pSql);
return 0;
}
int tscProcessShowCreateRsp(SSqlObj *pSql) {
return tscLocalResultCommonBuilder(pSql, 1);
}
int tscProcessQueryRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
@ -2164,11 +2166,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_META;
T_REF_INC(pNew->pTscObj);
// TODO add test case on x86 platform
uint64_t adr = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000);
registerSqlObj(pNew);
tscAddSubqueryInfo(&pNew->cmd);
@ -2295,10 +2293,8 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
}
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
T_REF_INC(pNew->pTscObj);
registerSqlObj(pNew);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
pNew->fp = tscTableMetaCallBack;
@ -2376,6 +2372,10 @@ void tscInitMsgsFp() {
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
tscKeepConn[TSDB_SQL_SHOW] = 1;
tscKeepConn[TSDB_SQL_RETRIEVE] = 1;
tscKeepConn[TSDB_SQL_SELECT] = 1;

View File

@ -156,10 +156,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
*taos = pObj;
}
T_REF_INC(pSql->pTscObj);
uint64_t key = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
registerSqlObj(pSql);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pSql;
@ -236,13 +233,21 @@ TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t us
return taos_connect(ipBuf, userBuf, passBuf, dbBuf, port);
}
static void asyncConnCallback(void *param, TAOS_RES *tres, int code) {
SSqlObj *pSql = (SSqlObj *) tres;
assert(pSql != NULL);
pSql->fetchFp(pSql->param, tres, code);
}
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) {
SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, fp, param, taos);
SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, taos);
if (pSql == NULL) {
return NULL;
}
pSql->fetchFp = fp;
pSql->res.code = tscProcessSql(pSql);
tscDebug("%p DB async connection is opening", taos);
return taos;
@ -255,33 +260,17 @@ void taos_close(TAOS *taos) {
return;
}
if (pObj->pHb != NULL) {
if (pObj->pHb->pRpcCtx != NULL) { // wait for rsp from dnode
rpcCancelRequest(pObj->pHb->pRpcCtx);
SSqlObj* pHb = pObj->pHb;
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode
rpcCancelRequest(pHb->pRpcCtx);
pHb->pRpcCtx = NULL;
}
tscSetFreeHeatBeat(pObj);
tscFreeSqlObj(pObj->pHb);
tscDebug("%p HB is freed", pHb);
taos_free_result(pHb);
}
// free all sqlObjs created by using this connect before free the STscObj
// while(1) {
// pthread_mutex_lock(&pObj->mutex);
// void* p = pObj->sqlList;
// pthread_mutex_unlock(&pObj->mutex);
//
// if (p == NULL) {
// break;
// }
//
// tscDebug("%p waiting for sqlObj to be freed, %p", pObj, p);
// taosMsleep(100);
//
// // todo fix me!! two threads call taos_free_result will cause problem.
// tscDebug("%p free :%p", pObj, p);
// taos_free_result(p);
// }
int32_t ref = T_REF_DEC(pObj);
assert(ref >= 0);
@ -485,6 +474,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE ||
pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE ||
pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
pCmd->command == TSDB_SQL_SERV_STATUS ||
@ -795,14 +786,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0;
tscDebug("%p Valid SQL: %s pObj:%p", pSql, sql, pObj);
int32_t sqlLen = (int32_t)strlen(sql);
@ -838,11 +832,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
tsem_wait(&pSql->rspSem);
code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) {
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(taos), pObj);
}
taos_free_result(pSql);
taos_free_result(pSql);
return code;
}
@ -941,34 +936,32 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res;
pRes->code = 0;
pRes->numOfTotal = 0; // the number of getting table meta from server
pRes->numOfClauseTotal = 0;
pRes->code = 0;
assert(pSql->fp == NULL);
tscDebug("%p tableNameList: %s pObj:%p", pSql, tableNameList, pObj);
int32_t tblListLen = (int32_t)strlen(tableNameList);
if (tblListLen > MAX_TABLE_NAME_LENGTH) {
tscError("%p tableNameList too long, length:%d, maximum allowed:%d", pSql, tblListLen, MAX_TABLE_NAME_LENGTH);
pRes->code = TSDB_CODE_TSC_INVALID_SQL;
taosTFree(pSql);
return pRes->code;
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
char *str = calloc(1, tblListLen + 1);
if (str == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql);
taosTFree(pSql);
return pRes->code;
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
strtolower(str, tableNameList);
pRes->code = (uint8_t)tscParseTblNameList(pSql, str, tblListLen);
int32_t code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen);
/*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
@ -978,17 +971,17 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
pRes->qhandle = 0;
free(str);
if (pRes->code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
return pRes->code;
return code;
}
tscDoQuery(pSql);
tscDebug("%p load multi metermeta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscPartiallyFreeSqlObj(pSql);
tscDebug("%p load multi table meta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if ((code = pRes->code) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
}
return pRes->code;
return code;
}

View File

@ -136,7 +136,6 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval;
} else {
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision);
//etime = taosGetIntervalStartTimestamp(etime, pStream->interval.sliding, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision);
}
pQueryInfo->window.ekey = etime;
if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) {
@ -454,17 +453,11 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
}
} else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now
stime = pQueryInfo->window.skey;
if (stime == INT64_MIN) {
stime = (int64_t)taosGetTimestamp(pStream->precision);
stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
stime = taosTimeTruncate(stime - 1, &pStream->interval, pStream->precision);
//stime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
//stime = taosGetIntervalStartTimestamp(stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime);
if (pQueryInfo->window.skey != INT64_MIN) {
stime = pQueryInfo->window.skey;
}
stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
} else {
//int64_t newStime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
if (newStime != stime) {
tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime);
@ -477,8 +470,10 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
}
static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
int64_t timer = pStream->stime - taosGetTimestamp(pStream->precision);
if (timer < 0) timer = 0;
int64_t timer = 0, now = taosGetTimestamp(pStream->precision);
if (pStream->stime > now) {
timer = pStream->stime - now;
}
int64_t startDelay =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay;
@ -515,6 +510,8 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return;
}
registerSqlObj(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
@ -568,6 +565,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pStream->pSql = pSql;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
@ -575,6 +573,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscFreeSqlObj(pSql);
return NULL;
}
strtolower(pSql->sqlstr, sqlstr);
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
@ -615,10 +614,9 @@ void taos_close_stream(TAOS_STREAM *handle) {
tscDebug("%p stream:%p is closed", pSql, pStream);
// notify CQ to release the pStream object
pStream->fp(pStream->param, NULL, NULL);
tscFreeSqlObj(pSql);
pStream->pSql = NULL;
taos_free_result(pSql);
taosTFree(pStream);
}
}

View File

@ -105,6 +105,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TAOS_SYSTEM_ERROR(errno);
goto fail;
}
tstrncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (pSub->progress == NULL) {
@ -119,6 +120,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail;
}
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->pSubscription = pSub;
@ -142,6 +144,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail;
}
strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qhandle = 0;
pRes->numOfRows = 1;
@ -152,11 +155,14 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail;
}
registerSqlObj(pSql);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tsem_wait(&pSub->sem);
code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) {
line = __LINE__;
goto fail;
@ -173,9 +179,15 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
fail:
tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code));
if (pSql != NULL) {
tscFreeSqlObj(pSql);
if (pSql->self != NULL) {
taos_free_result(pSql);
} else {
tscFreeSqlObj(pSql);
}
pSql = NULL;
}
if (pSub != NULL) {
taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem);
@ -494,6 +506,10 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
}
}
if (pSub->pSql != NULL) {
taos_free_result(pSub->pSql);
}
tscFreeSqlObj(pSub->pSql);
taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem);

View File

@ -23,6 +23,7 @@
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tscSubquery.h"
typedef struct SInsertSupporter {
SSubqueryState* pState;
@ -278,7 +279,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscDebug("%p subIndex: %d, no need to launch query, ignore it", pSql, i);
tscDestroyJoinSupporter(pSupporter);
tscFreeSqlObj(pPrevSub);
taos_free_result(pPrevSub);
pSql->pSubs[i] = NULL;
continue;
@ -1383,7 +1384,7 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState
taosTFree(pSupport->localBuffer);
taosTFree(pSupport);
tscFreeSqlObj(pSub);
taos_free_result(pSub);
}
free(pState);

View File

@ -122,11 +122,8 @@ void taos_init_imp(void) {
tscInitMsgsFp();
int queueSize = tsMaxConnections*2;
if (tscEmbedded == 0) {
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 2.0);
} else {
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 4.0);
}
double factor = (tscEmbedded == 0)? 2.0:4.0;
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
if (tscNumOfThreads < 2) tscNumOfThreads = 2;
@ -140,11 +137,8 @@ void taos_init_imp(void) {
if(0 == tscEmbedded){
taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);
}
int64_t refreshTime = tsTableMetaKeepTimer;
refreshTime = refreshTime > 10 ? 10 : refreshTime;
refreshTime = refreshTime < 10 ? 10 : refreshTime;
int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj");

View File

@ -391,10 +391,21 @@ static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) {
*/
void tscFreeSqlObjInCache(void *pSql) {
assert(pSql != NULL);
SSqlObj** p = (SSqlObj**)pSql;
STscObj* pTscObj = (*p)->pTscObj;
assert((*p)->self != 0 && (*p)->self == (p));
tscFreeSqlObj(*p);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
tscCloseTscObj(pTscObj);
}
}
void tscFreeSqlObj(SSqlObj* pSql) {
@ -403,7 +414,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
}
tscDebug("%p start to free sqlObj", pSql);
STscObj* pTscObj = pSql->pTscObj;
tscFreeSubobj(pSql);
tscPartiallyFreeSqlObj(pSql);
@ -421,14 +431,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tsem_destroy(&pSql->rspSem);
free(pSql);
tscDebug("%p free sqlObj completed", pSql);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
if (ref == 0) {
tscCloseTscObj(pTscObj);
}
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
@ -1265,6 +1267,51 @@ static int32_t validateQuoteToken(SStrToken* pToken) {
return TSDB_CODE_SUCCESS;
}
void tscDequoteAndTrimToken(SStrToken* pToken) {
assert(pToken->type == TK_STRING);
uint32_t first = 0, last = pToken->n;
// trim leading spaces
while (first < last) {
char c = pToken->z[first];
if (c != ' ' && c != '\t') {
break;
}
first++;
}
// trim ending spaces
while (first < last) {
char c = pToken->z[last - 1];
if (c != ' ' && c != '\t') {
break;
}
last--;
}
// there are still at least two characters
if (first < last - 1) {
char c = pToken->z[first];
// dequote
if ((c == '\'' || c == '"') && c == pToken->z[last - 1]) {
first++;
last--;
}
}
// left shift the string and pad spaces
for (uint32_t i = 0; i + first < last; i++) {
pToken->z[i] = pToken->z[first + i];
}
for (uint32_t i = last - first; i < pToken->n; i++) {
pToken->z[i] = ' ';
}
// adjust token length
pToken->n = last - first;
}
int32_t tscValidateName(SStrToken* pToken) {
if (pToken->type != TK_STRING && pToken->type != TK_ID) {
return TSDB_CODE_TSC_INVALID_SQL;
@ -1735,6 +1782,16 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->numOfRows = 0;
}
void registerSqlObj(SSqlObj* pSql) {
int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec
int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
uint64_t p = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &p, sizeof(uint64_t), DEFAULT_LIFE_TIME);
}
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
@ -1743,13 +1800,12 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
}
pNew->pTscObj = pSql->pTscObj;
T_REF_INC(pNew->pTscObj);
pNew->signature = pNew;
SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = cmd;
pCmd->parseFinished = 1;
pCmd->autoCreated = pSql->cmd.autoCreated;
if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew);
@ -1764,8 +1820,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) {
tscError("%p new subquery failed", pSql);
free(pNew);
tscFreeSqlObj(pNew);
return NULL;
}
@ -1776,10 +1831,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
registerSqlObj(pNew);
return pNew;
}
@ -1869,7 +1921,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
T_REF_INC(pNew->pTscObj);
pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) {
@ -2018,10 +2069,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
}
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10);
registerSqlObj(pNew);
return pNew;
_error:

View File

@ -78,6 +78,9 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_LOCALMERGE, "retrieve-localmerge" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_TABLE_JOIN_RETRIEVE, "join-retrieve" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_TABLE, "show-create-table")
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_DATABASE, "show-create-database")
/*
* build empty result instead of accessing dnode to fetch result
* reset the client cache

View File

@ -174,6 +174,7 @@ extern int32_t rpcDebugFlag;
extern int32_t odbcDebugFlag;
extern int32_t qDebugFlag;
extern int32_t wDebugFlag;
extern int32_t cqDebugFlag;
extern int32_t debugFlag;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

View File

@ -131,7 +131,7 @@ uint16_t tsHttpPort = 6041; // only tcp, range tcp[6041]
int32_t tsHttpCacheSessions = 1000;
int32_t tsHttpSessionExpire = 36000;
int32_t tsHttpMaxThreads = 2;
int32_t tsHttpEnableCompress = 0;
int32_t tsHttpEnableCompress = 1;
int32_t tsHttpEnableRecordSql = 0;
int32_t tsTelegrafUseFieldNum = 0;
@ -203,6 +203,7 @@ int32_t debugFlag = 0;
int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 135;
int32_t (*monitorStartSystemFp)() = NULL;
void (*monitorStopSystemFp)() = NULL;
@ -222,12 +223,13 @@ void taosSetAllDebugFlag() {
httpDebugFlag = debugFlag;
mqttDebugFlag = debugFlag;
monitorDebugFlag = debugFlag;
qDebugFlag = debugFlag;
rpcDebugFlag = debugFlag;
uDebugFlag = debugFlag;
sDebugFlag = debugFlag;
wDebugFlag = debugFlag;
tsdbDebugFlag = debugFlag;
qDebugFlag = debugFlag;
cqDebugFlag = debugFlag;
uInfo("all debug flag are set to %d", debugFlag);
}
}
@ -1209,6 +1211,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "cqDebugFlag";
cfg.ptr = &cqDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG;
cfg.minValue = 0;
cfg.maxValue = 255;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "tscEnableRecordSql";
cfg.ptr = &tsTscEnableRecordSql;
cfg.valType = TAOS_CFG_VTYPE_INT32;

@ -1 +1 @@
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
Subproject commit 06ec30a0f1762e8169bf6b9045c82bcaa52bcdf0

View File

@ -5,10 +5,9 @@
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.6</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
<description>TDengine JDBC Driver</description>

View File

@ -291,8 +291,8 @@ public class TSDBDriver implements java.sql.Driver {
return null;
}
String lowerUrl = url.toLowerCase();
if (!lowerUrl.startsWith(URL_PREFIX) && !lowerUrl.startsWith(URL_PREFIX1)) {
// String lowerUrl = url.toLowerCase();
if (!url.startsWith(URL_PREFIX) && !url.startsWith(URL_PREFIX1)) {
return null;
}

View File

@ -2,6 +2,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
IF (TD_LINUX)

View File

@ -21,6 +21,7 @@
#include <string.h>
#include "taos.h"
#include "tsclient.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "ttimer.h"
@ -65,8 +66,6 @@ typedef struct SCqObj {
SCqContext * pContext;
} SCqObj;
int cqDebugFlag = 135;
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
@ -238,24 +237,31 @@ void cqDrop(void *handle) {
pthread_mutex_unlock(&pContext->mutex);
}
static void doCreateStream(void *param, TAOS_RES *result, int code) {
SCqObj* pObj = (SCqObj*)param;
SCqContext* pContext = pObj->pContext;
SSqlObj* pSql = (SSqlObj*)result;
pContext->dbConn = pSql->pTscObj;
cqCreateStream(pContext, pObj);
}
static void cqProcessCreateTimer(void *param, void *tmrId) {
SCqObj* pObj = (SCqObj*)param;
SCqContext* pContext = pObj->pContext;
if (pContext->dbConn == NULL) {
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0);
if (pContext->dbConn == NULL) {
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
}
cDebug("vgId:%d, try connect to TDengine", pContext->vgId);
taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL);
} else {
cqCreateStream(pContext, pObj);
}
cqCreateStream(pContext, pObj);
}
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pContext = pContext;
if (pContext->dbConn == NULL) {
cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId);
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl);
return;
}

View File

@ -673,6 +673,7 @@ typedef struct {
typedef struct STableMetaMsg {
int32_t contLen;
char tableId[TSDB_TABLE_FNAME_LEN]; // table id
char sTableId[TSDB_TABLE_FNAME_LEN];
uint8_t numOfTags;
uint8_t precision;
uint8_t tableType;

View File

@ -16,6 +16,7 @@
#ifndef TDENGINE_TTOKENDEF_H
#define TDENGINE_TTOKENDEF_H
#define TK_ID 1
#define TK_BOOL 2
#define TK_TINYINT 3
@ -75,24 +76,24 @@
#define TK_VNODES 57
#define TK_IPTOKEN 58
#define TK_DOT 59
#define TK_TABLES 60
#define TK_STABLES 61
#define TK_VGROUPS 62
#define TK_DROP 63
#define TK_TABLE 64
#define TK_DATABASE 65
#define TK_DNODE 66
#define TK_USER 67
#define TK_ACCOUNT 68
#define TK_USE 69
#define TK_DESCRIBE 70
#define TK_ALTER 71
#define TK_PASS 72
#define TK_PRIVILEGE 73
#define TK_LOCAL 74
#define TK_IF 75
#define TK_EXISTS 76
#define TK_CREATE 77
#define TK_CREATE 60
#define TK_TABLE 61
#define TK_DATABASE 62
#define TK_TABLES 63
#define TK_STABLES 64
#define TK_VGROUPS 65
#define TK_DROP 66
#define TK_DNODE 67
#define TK_USER 68
#define TK_ACCOUNT 69
#define TK_USE 70
#define TK_DESCRIBE 71
#define TK_ALTER 72
#define TK_PASS 73
#define TK_PRIVILEGE 74
#define TK_LOCAL 75
#define TK_IF 76
#define TK_EXISTS 77
#define TK_PPS 78
#define TK_TSERIES 79
#define TK_DBS 80
@ -222,7 +223,6 @@
#define TK_INTO 204
#define TK_VALUES 205
#define TK_SPACE 300
#define TK_COMMENT 301
#define TK_ILLEGAL 302

View File

@ -44,6 +44,7 @@ typedef void* twalh; // WAL HANDLE
typedef int (*FWalWrite)(void *ahandle, void *pHead, int type);
twalh walOpen(const char *path, const SWalCfg *pCfg);
int walAlter(twalh pWal, const SWalCfg *pCfg);
void walClose(twalh);
int walRenew(twalh);
int walWrite(twalh, SWalHead *);

View File

@ -910,13 +910,13 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
}
if (walLevel > 0 && walLevel != pDb->cfg.walLevel) {
mError("db:%s, can't alter walLevel option", pDb->name);
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
mDebug("db:%s, walLevel:%d change to %d", pDb->name, pDb->cfg.walLevel, walLevel);
newCfg.walLevel = walLevel;
}
if (fsyncPeriod >= 0 && fsyncPeriod != pDb->cfg.fsyncPeriod) {
mError("db:%s, can't alter fsyncPeriod option", pDb->name);
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
mDebug("db:%s, fsyncPeriod:%d change to %d", pDb->name, pDb->cfg.fsyncPeriod, fsyncPeriod);
newCfg.fsyncPeriod = fsyncPeriod;
}
if (replications > 0 && replications != pDb->cfg.replications) {

View File

@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 5, false, mnodeFreeShowObj, "show");
tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 5, true, mnodeFreeShowObj, "show");
return 0;
}
@ -389,10 +389,12 @@ static bool mnodeAccquireShowObj(SShowObj *pShow) {
}
static void* mnodePutShowObj(SShowObj *pShow) {
const int32_t DEFAULT_SHOWHANDLE_LIFE_SPAN = tsShellActivityTimer * 6 * 1000;
if (tsMnodeShowCache != NULL) {
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
uint64_t handleVal = (uint64_t)pShow;
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(int64_t), &pShow, sizeof(int64_t), 6000);
SShowObj **ppShow = taosCachePut(tsMnodeShowCache, &handleVal, sizeof(int64_t), &pShow, sizeof(int64_t), DEFAULT_SHOWHANDLE_LIFE_SPAN);
pShow->ppShow = (void**)ppShow;
mDebug("%p, show is put into cache, data:%p index:%d", pShow, ppShow, pShow->index);
return pShow;

View File

@ -1384,6 +1384,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
}
pShow->numOfReads += numOfRows;
const int32_t NUM_OF_COLUMNS = 5;
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
mnodeDecDbRef(pDb);
return numOfRows;
@ -2090,6 +2093,9 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = pTable->info.type;
tstrncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN);
if (pTable->superTable) {
tstrncpy(pMeta->sTableId, pTable->superTable->info.tableId, TSDB_TABLE_FNAME_LEN);
}
if (pTable->info.type == TSDB_CHILD_TABLE) {
pMeta->sversion = htons(pTable->superTable->sversion);
@ -2122,8 +2128,8 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
}
pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId);
mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->uid);
mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved, vgId:%d sid:%d", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->uid, pTable->vgId, pTable->sid);
return TSDB_CODE_SUCCESS;
}

View File

@ -660,13 +660,13 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
for (int32_t i = 0; i < pShow->maxReplica; ++i) {
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "dnode");
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "dnode%d", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vstatus");
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%dstatus", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
}

View File

@ -19,7 +19,7 @@
#include <stdint.h>
#include <stdbool.h>
#define JSON_BUFFER_SIZE 10240
#define JSON_BUFFER_SIZE 16384
struct HttpContext;
enum { JsonNumber, JsonString, JsonBoolean, JsonArray, JsonObject, JsonNull };

View File

@ -52,12 +52,12 @@ int32_t httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int32_t
}
if (len < 0) {
httpDebug("context:%p, fd:%d, socket write errno:%d, times:%d", pContext, pContext->fd, errno, countWait);
httpDebug("context:%p, fd:%d, socket write errno:%d:%s, times:%d", pContext, pContext->fd, errno, strerror(errno), countWait);
if (++countWait > HTTP_WRITE_RETRY_TIMES) break;
taosMsleep(HTTP_WRITE_WAIT_TIME_MS);
continue;
} else if (len == 0) {
httpDebug("context:%p, fd:%d, socket write errno:%d, connect already closed", pContext, pContext->fd, errno);
httpDebug("context:%p, fd:%d, socket write errno:%d:%s, connect already closed", pContext, pContext->fd, errno, strerror(errno));
break;
} else {
countWait = 0;
@ -131,14 +131,14 @@ int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, (const char*)compressBuf, compressBufLen);
} else {
httpTrace("context:%p, fd:%d, last:%d, compress already dumped, response:\n%s", buf->pContext,
httpDebug("context:%p, fd:%d, last:%d, compress already dumped, response:\n%s", buf->pContext,
buf->pContext->fd, isTheLast, buf->buf);
return 0; // there is no data to dump.
remain = 0; // there is no data to dump.
}
} else {
httpError("context:%p, fd:%d, failed to compress data, chunkSize:%" PRIu64 ", last:%d, error:%d, response:\n%s",
buf->pContext, buf->pContext->fd, srcLen, isTheLast, ret, buf->buf);
return 0;
remain = 0;
}
}

View File

@ -257,20 +257,20 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int unUsedCo
HttpEncodeMethod *encode = pContext->encodeMethod;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd,
pContext->user, pContext->session->taos, tstrerror(code), (SSqlObj *)result);
httpError("context:%p, fd:%d, user:%s, query error, code:%s:inprogress, sqlObj:%p", pContext, pContext->fd,
pContext->user, tstrerror(code), (SSqlObj *)result);
return;
}
if (code < 0) {
SSqlObj *pObj = (SSqlObj *)result;
if (code == TSDB_CODE_TSC_INVALID_SQL) {
httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s, sqlObj:%p, error:%s", pContext,
pContext->fd, pContext->user, pContext->session->taos, tstrerror(code), pObj, pObj->cmd.payload);
httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext,
pContext->fd, pContext->user, tstrerror(code), pObj, pObj->cmd.payload);
httpSendTaosdInvalidSqlErrorResp(pContext, pObj->cmd.payload);
} else {
httpError("context:%p, fd:%d, user:%s, query error, taos:%p, code:%s, sqlObj:%p", pContext, pContext->fd,
pContext->user, pContext->session->taos, tstrerror(code), pObj);
httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p", pContext, pContext->fd,
pContext->user, tstrerror(code), pObj);
httpSendErrorResp(pContext, code);
}
taos_free_result(result);

View File

@ -406,15 +406,21 @@ int32_t httpGzipCompressInit(HttpContext *pContext) {
int32_t httpGzipCompress(HttpContext *pContext, char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData, bool isTheLast) {
int32_t err = 0;
int32_t lastTotalLen = (int32_t) (pContext->gzipStream.total_out);
pContext->gzipStream.next_in = (Bytef *) srcData;
pContext->gzipStream.avail_in = (uLong) nSrcData;
pContext->gzipStream.next_out = (Bytef *) destData;
pContext->gzipStream.avail_out = (uLong) (*nDestData);
while (pContext->gzipStream.avail_in != 0 && pContext->gzipStream.total_out < (uLong) (*nDestData)) {
while (pContext->gzipStream.avail_in != 0) {
if (deflate(&pContext->gzipStream, Z_FULL_FLUSH) != Z_OK) {
return -1;
}
int32_t cacheLen = pContext->gzipStream.total_out - lastTotalLen;
if (cacheLen >= *nDestData) {
return -2;
}
}
if (pContext->gzipStream.avail_in != 0) {
@ -427,16 +433,16 @@ int32_t httpGzipCompress(HttpContext *pContext, char *srcData, int32_t nSrcData,
break;
}
if (err != Z_OK) {
return -2;
return -3;
}
}
if (deflateEnd(&pContext->gzipStream) != Z_OK) {
return -3;
return -4;
}
}
*nDestData = (int32_t) (pContext->gzipStream.total_out);
*nDestData = (int32_t) (pContext->gzipStream.total_out) - lastTotalLen;
return 0;
}

View File

@ -80,6 +80,7 @@ cmd ::= SHOW GRANTS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_GRANTS, 0, 0);
cmd ::= SHOW VNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VNODES, 0, 0); }
cmd ::= SHOW VNODES IPTOKEN(X). { setShowOptions(pInfo, TSDB_MGMT_TABLE_VNODES, &X, 0); }
%type dbPrefix {SStrToken}
dbPrefix(A) ::=. {A.n = 0; A.type = 0;}
dbPrefix(A) ::= ids(X) DOT. {A = X; }
@ -88,6 +89,15 @@ dbPrefix(A) ::= ids(X) DOT. {A = X; }
cpxName(A) ::= . {A.n = 0; }
cpxName(A) ::= DOT ids(Y). {A = Y; A.n += 1; }
cmd ::= SHOW CREATE TABLE ids(X) cpxName(Y). {
X.n += Y.n;
setDCLSQLElems(pInfo, TSDB_SQL_SHOW_CREATE_TABLE, 1, &X);
}
cmd ::= SHOW CREATE DATABASE ids(X). {
setDCLSQLElems(pInfo, TSDB_SQL_SHOW_CREATE_DATABASE, 1, &X);
}
cmd ::= SHOW dbPrefix(X) TABLES. {
setShowOptions(pInfo, TSDB_MGMT_TABLE_TABLE, &X, 0);
}

View File

@ -2120,7 +2120,7 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
}
}
// no statistics data
// no statistics data, load the true data block
if (index == -1) {
return true;
}
@ -2130,8 +2130,17 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
return true;
}
// all points in current column are NULL, no need to check its boundary value
// all data in current column are NULL, no need to check its boundary value
if (pDataStatis[index].numOfNull == numOfRows) {
// if isNULL query exists, load the null data column
for (int32_t j = 0; j < pFilterInfo->numOfFilters; ++j) {
SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j];
if (pFilterElem->fp == isNull_filter) {
return true;
}
}
continue;
}
@ -2957,11 +2966,10 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
STableQueryInfo *item = taosArrayGetP(pGroup, i);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
pageList = list;
tid = TSDB_TABLEID(item->pTable)->tid;
if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) {
pTableList[numOfTables++] = item;
tid = TSDB_TABLEID(item->pTable)->tid;
pageList = list;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -580,6 +580,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle;
pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
if (pConn->chandle == NULL) {
tError("failed to connect to:0x%x:%d", pConn->peerIp, pConn->peerPort);
terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcCloseConn(pConn);
pConn = NULL;

View File

@ -136,6 +136,7 @@ typedef struct SSkipListIterator {
SSkipListNode *cur;
int32_t step; // the number of nodes that have been checked already
int32_t order; // order of the iterator
SSkipListNode *next; // next points to the true qualified node in skip list
} SSkipListIterator;
/**

View File

@ -97,7 +97,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
assert(size > 0);
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes",
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
if (pCacheObj->freeFp) {
@ -260,7 +260,12 @@ static void incRefFn(void* ptNode) {
}
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0 || pCacheObj->deleting == 1) {
if (pCacheObj == NULL || pCacheObj->deleting == 1) {
return NULL;
}
if (taosHashGetSize(pCacheObj->pHashTable) == 0) {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
return NULL;
}

View File

@ -79,9 +79,12 @@ static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t
// when order is TSDB_ORDER_ASC, return the last node with key less than val
// when order is TSDB_ORDER_DESC, return the first node with key large than val
static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order) {
static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order, SSkipListNode** pCur) {
__compar_fn_t comparFn = pSkipList->comparFn;
SSkipListNode *pNode = NULL;
if (pCur != NULL) {
*pCur = NULL;
}
if (order == TSDB_ORDER_ASC) {
pNode = pSkipList->pHead;
@ -93,6 +96,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_
pNode = p;
p = SL_GET_FORWARD_POINTER(p, i);
} else {
if (pCur != NULL) {
*pCur = p;
}
break;
}
}
@ -107,6 +113,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_
pNode = p;
p = SL_GET_BACKWARD_POINTER(p, i);
} else {
if (pCur != NULL) {
*pCur = p;
}
break;
}
}
@ -295,7 +304,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) {
pthread_rwlock_wrlock(pSkipList->lock);
}
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC);
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC, NULL);
while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) {
@ -452,7 +461,7 @@ uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
pthread_rwlock_wrlock(pSkipList->lock);
}
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC);
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC, NULL);
while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) {
@ -545,7 +554,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char*
pthread_rwlock_rdlock(pSkipList->lock);
}
iter->cur = getPriorNode(pSkipList, val, order);
iter->cur = getPriorNode(pSkipList, val, order, &iter->next);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
@ -567,8 +576,22 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
if (iter->cur != iter->next && (iter->next != NULL)) {
iter->cur = iter->next;
}
iter->next = SL_GET_FORWARD_POINTER(iter->cur, 0);
} else { // descending order iterate
iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
if (iter->cur != iter->next && (iter->next != NULL)) {
iter->cur = iter->next;
}
iter->next = SL_GET_BACKWARD_POINTER(iter->cur, 0);
}
if (pSkipList->lock) {
@ -715,9 +738,11 @@ SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order)
iter->order = order;
if(order == TSDB_ORDER_ASC) {
iter->cur = pSkipList->pHead;
iter->next = SL_GET_FORWARD_POINTER(iter->cur, 0);
} else {
iter->cur = pSkipList->pTail;
iter->next = SL_GET_BACKWARD_POINTER(iter->cur, 0);
}
return iter;
}

View File

@ -186,6 +186,12 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
return code;
}
code = walAlter(pVnode->wal, &pVnode->walCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
return code;
}
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY;
@ -390,6 +396,7 @@ void vnodeRelease(void *pVnodeRaw) {
if (0 == tsEnableVnodeBak) {
vInfo("vgId:%d, vnode backup not enabled", pVnode->vgId);
} else {
taosRemoveDir(newDir);
taosRename(rootDir, newDir);
}

View File

@ -58,7 +58,8 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return TSDB_CODE_APP_NOT_READY;
// TODO: Later, let slave to support query
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_APP_NOT_READY;
}

View File

@ -69,6 +69,13 @@ static void walModuleInitFunc() {
wDebug("WAL module is initialized");
}
static inline bool walNeedFsyncTimer(SWal *pWal) {
if (pWal->fsyncPeriod > 0 && pWal->level == TAOS_WAL_FSYNC) {
return true;
}
return false;
}
void *walOpen(const char *path, const SWalCfg *pCfg) {
SWal *pWal = calloc(sizeof(SWal), 1);
if (pWal == NULL) {
@ -95,7 +102,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL);
if (pWal->fsyncPeriod > 0 && pWal->level == TAOS_WAL_FSYNC) {
if (walNeedFsyncTimer(pWal)) {
pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
if (pWal->timer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
@ -127,6 +134,37 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
return pWal;
}
int walAlter(twalh wal, const SWalCfg *pCfg) {
SWal *pWal = wal;
if (pWal == NULL) {
return TSDB_CODE_WAL_APP_ERROR;
}
if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) {
wDebug("wal:%s, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->name, pWal->level,
pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod);
return TSDB_CODE_SUCCESS;
}
wInfo("wal:%s, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->name, pWal->level, pWal->fsyncPeriod,
pCfg->walLevel, pCfg->fsyncPeriod);
pthread_mutex_lock(&pWal->mutex);
pWal->level = pCfg->walLevel;
pWal->fsyncPeriod = pCfg->fsyncPeriod;
if (walNeedFsyncTimer(pWal)) {
wInfo("wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer,walTmrCtrl);
} else {
wInfo("wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
taosTmrStop(pWal->timer);
pWal->timer = NULL;
}
pthread_mutex_unlock(&pWal->mutex);
return TSDB_CODE_SUCCESS;
}
void walClose(void *handle) {
if (handle == NULL) return;
@ -484,6 +522,12 @@ static void walProcessFsyncTimer(void *param, void *tmrId) {
if (fsync(pWal->fd) < 0) {
wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
}
pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
if (walNeedFsyncTimer(pWal)) {
pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
} else {
wInfo("wal:%s, stop fsync timer for walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
taosTmrStop(pWal->timer);
pWal->timer = NULL;
}
}

View File

@ -11,7 +11,6 @@ public class JDBCConnectorChecker {
private static String tbName = "weather";
private Connection connection;
/**
* get connection
**/
@ -170,5 +169,4 @@ public class JDBCConnectorChecker {
checker.close();
}
}

View File

@ -0,0 +1,5 @@
#!/bin/bash
bash ./case001/case001.sh
#bash ./case002/case002.sh
#bash ./case003/case003.sh

View File

@ -0,0 +1,308 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/taosSql"
"log"
"time"
)
func main() {
taosDriverName := "taosSql"
demodb := "demodb"
demot := "demot"
fmt.Printf("\n======== start demo test ========\n")
// open connect to taos server
db, err := sql.Open(taosDriverName, "root:taosdata@/tcp(192.168.1.217:7100)/")
if err != nil {
log.Fatalf("Open database error: %s\n", err)
}
defer db.Close()
drop_database(db, demodb)
create_database(db, demodb)
use_database(db, demodb)
create_table(db, demot)
insert_data(db, demot)
select_data(db, demot)
fmt.Printf("\n======== start stmt mode test ========\n")
demodbStmt := "demodbStmt"
demotStmt := "demotStmt"
drop_database_stmt(db, demodbStmt)
create_database_stmt(db, demodbStmt)
use_database_stmt(db, demodbStmt)
create_table_stmt(db, demotStmt)
insert_data_stmt(db, demotStmt)
select_data_stmt(db, demotStmt)
fmt.Printf("\n======== end demo test ========\n")
}
func drop_database(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
res, err := db.Exec("drop database if exists " + demodb)
checkErr(err, "drop database if exists "+demodb)
affectd, err := res.RowsAffected()
checkErr(err, "drop db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("drop database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
//sleep 50毫秒
time.Sleep(time.Duration(50)* time.Millisecond)
}
func create_database(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// create database
res, err := db.Exec("create database " + demodb)
checkErr(err, "create db, db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "create db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
return
}
func use_database(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// use database
res, err := db.Exec("use " + demodb) // notes: must no quote to db name
checkErr(err, "use db db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "use db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("use database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func create_table(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
// create table
res, err := db.Exec("create table " + demot + " (ts timestamp, id int, name binary(8), len tinyint, flag bool, notes binary(8), fv float, dv double)")
checkErr(err, "create table db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "create table res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create table result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func insert_data(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
// insert data
res, err := db.Exec("insert into " + demot +
" values (now, 100, 'beijing', 10, true, 'one', 123.456, 123.456)" +
" (now+1s, 101, 'shanghai', 11, true, 'two', 789.123, 789.123)" +
" (now+2s, 102, 'shenzhen', 12, false, 'three', 456.789, 456.789)")
checkErr(err, "insert data, db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "insert data res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func select_data(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
rows, err := db.Query("select * from ? ", demot) // go text mode
checkErr(err, "select db.Query")
fmt.Printf("%10s%s%8s %5s %9s%s %s %8s%s %7s%s %8s%s %4s%s %5s%s\n", " ", "ts", " ", "id", " ", "name", " ", "len", " ", "flag", " ", "notes", " ", "fv", " ", " ", "dv")
var affectd int
//decoder := mahonia.NewDecoder("gbk") // 把原来ANSI格式的文本文件里的字符用gbk进行解码。
for rows.Next() {
var ts string
var name string
var id int
var len int8
var flag bool
var notes string
var fv float32
var dv float64
err = rows.Scan(&ts, &id, &name, &len, &flag, &notes, &fv, &dv)
checkErr(err, "select rows.Scan")
fmt.Printf("%s|\t", ts)
fmt.Printf("%d|\t", id)
fmt.Printf("%10s|\t", name)
fmt.Printf("%d|\t", len)
fmt.Printf("%t|\t", flag)
fmt.Printf("%s|\t", notes)
fmt.Printf("%06.3f|\t", fv)
fmt.Printf("%09.6f|\n\n", dv)
affectd++
}
et := time.Now().Nanosecond()
fmt.Printf("select data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
//fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func drop_database_stmt(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// drop test db
res, err := db.Exec("drop database if exists " + demodb)
checkErr(err, "drop database "+demodb)
affectd, err := res.RowsAffected()
checkErr(err, "drop db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("drop database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func create_database_stmt(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// create database
//var stmt interface{}
stmt, err := db.Prepare("create database ?")
checkErr(err, "create db, db.Prepare")
//var res driver.Result
res, err := stmt.Exec(demodb)
checkErr(err, "create db, stmt.Exec")
//fmt.Printf("Query OK, %d row(s) affected()", res.RowsAffected())
affectd, err := res.RowsAffected()
checkErr(err, "create db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func use_database_stmt(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// create database
//var stmt interface{}
stmt, err := db.Prepare("use " + demodb)
checkErr(err, "use db, db.Prepare")
res, err := stmt.Exec()
checkErr(err, "use db, stmt.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "use db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("use database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func create_table_stmt(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
// create table
// (ts timestamp, id int, name binary(8), len tinyint, flag bool, notes binary(8), fv float, dv double)
stmt, err := db.Prepare("create table ? (? timestamp, ? int, ? binary(10), ? tinyint, ? bool, ? binary(8), ? float, ? double)")
checkErr(err, "create table db.Prepare")
res, err := stmt.Exec(demot, "ts", "id", "name", "len", "flag", "notes", "fv", "dv")
checkErr(err, "create table stmt.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "create table res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create table result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func insert_data_stmt(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
// insert data into table
stmt, err := db.Prepare("insert into ? values(?, ?, ?, ?, ?, ?, ?, ?) (?, ?, ?, ?, ?, ?, ?, ?) (?, ?, ?, ?, ?, ?, ?, ?)")
checkErr(err, "insert db.Prepare")
res, err := stmt.Exec(demot, "now", 1000, "'haidian'", 6, true, "'AI world'", 6987.654, 321.987,
"now+1s", 1001, "'changyang'", 7, false, "'DeepMode'", 12356.456, 128634.456,
"now+2s", 1002, "'chuangping'", 8, true, "'database'", 3879.456, 65433478.456)
checkErr(err, "insert data, stmt.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func select_data_stmt(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
stmt, err := db.Prepare("select ?, ?, ?, ?, ?, ?, ?, ? from ?") // go binary mode
checkErr(err, "db.Prepare")
rows, err := stmt.Query("ts", "id", "name", "len", "flag", "notes", "fv", "dv", demot)
checkErr(err, "stmt.Query")
fmt.Printf("%10s%s%8s %5s %8s%s %s %10s%s %7s%s %8s%s %11s%s %14s%s\n", " ", "ts", " ", "id", " ", "name", " ", "len", " ", "flag", " ", "notes", " ", "fv", " ", " ", "dv")
var affectd int
for rows.Next() {
var ts string
var name string
var id int
var len int8
var flag bool
var notes string
var fv float32
var dv float64
err = rows.Scan(&ts, &id, &name, &len, &flag, &notes, &fv, &dv)
//fmt.Println("start scan fields from row.rs, &fv:", &fv)
//err = rows.Scan(&fv)
checkErr(err, "rows.Scan")
fmt.Printf("%s|\t", ts)
fmt.Printf("%d|\t", id)
fmt.Printf("%10s|\t", name)
fmt.Printf("%d|\t", len)
fmt.Printf("%t|\t", flag)
fmt.Printf("%s|\t", notes)
fmt.Printf("%06.3f|\t", fv)
fmt.Printf("%09.6f|\n", dv)
affectd++
}
et := time.Now().Nanosecond()
fmt.Printf("select data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func checkErr(err error, prompt string) {
if err != nil {
fmt.Printf("%s\n", prompt)
panic(err)
}
}

View File

@ -0,0 +1,70 @@
#!/bin/bash
##################################################
#
# Do go test
#
##################################################
set +e
#set -x
script_dir="$(dirname $(readlink -f $0))"
#echo "pwd: $script_dir, para0: $0"
execName=$0
execName=`echo ${execName##*/}`
goName=`echo ${execName%.*}`
###### step 1: start one taosd
scriptDir=$script_dir/../../script/sh
bash $scriptDir/stop_dnodes.sh
bash $scriptDir/deploy.sh -n dnode1 -i 1
bash $scriptDir/cfg.sh -n dnode1 -c walLevel -v 0
bash $scriptDir/exec.sh -n dnode1 -s start
###### step 2: set config item
TAOS_CFG=/etc/taos/taos.cfg
HOSTNAME=`hostname -f`
if [ ! -f ${TAOS_CFG} ]; then
touch -f $TAOS_CFG
fi
echo " " > $TAOS_CFG
echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG
echo "secondEp ${HOSTNAME}:7200" >> $TAOS_CFG
echo "serverPort 7100" >> $TAOS_CFG
#echo "dataDir $DATA_DIR" >> $TAOS_CFG
#echo "logDir $LOG_DIR" >> $TAOS_CFG
#echo "scriptDir ${CODE_DIR}/../script" >> $TAOS_CFG
echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG
echo "tablemetakeeptimer 5" >> $TAOS_CFG
echo "wal 0" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG
echo "enableCoreFile 1" >> $TAOS_CFG
echo " " >> $TAOS_CFG
ulimit -n 600000
ulimit -c unlimited
#
##sudo sysctl -w kernel.core_pattern=$TOP_DIR/core.%p.%e
#
###### step 3: start build
cd $script_dir
rm -f go.*
go mod init $goName
go build
sleep 1s
sudo ./$goName

42
tests/gotest/test.sh Normal file
View File

@ -0,0 +1,42 @@
#!/bin/bash
##################################################
#
# Do go test
#
##################################################
set +e
#set -x
FILE_NAME=
RELEASE=0
while getopts "f:" arg
do
case $arg in
f)
FILE_NAME=$OPTARG
echo "input file: $FILE_NAME"
;;
?)
echo "unknow argument"
;;
esac
done
# start one taosd
bash ../script/sh/stop_dnodes.sh
bash ../script/sh/deploy.sh -n dnode1 -i 1
bash ../script/sh/cfg.sh -n dnode1 -c walLevel -v 0
bash ../script/sh/exec.sh -n dnode1 -s start
# start build test go file
caseDir=`echo ${FILE_NAME%/*}`
echo "caseDir: $caseDir"
cd $caseDir
rm go.*
go mod init $caseDir
go build
sleep 1s
./$caseDir

View File

@ -53,7 +53,7 @@ function buildTDengine {
function runGeneralCaseOneByOne {
while read -r line; do
if [[ $line =~ ^./test.sh* ]]; then
case=`echo $line | grep -w "general\|unique\/mnode\/mgmt33.sim\|unique\/stable\/dnode3.sim\|unique\/cluster\/balance3.sim\|unique\/arbitrator\/offline_replica2_alterTable_online.sim"|awk '{print $NF}'`
case=`echo $line | grep sim$ |awk '{print $NF}'`
if [ -n "$case" ]; then
./test.sh -f $case > /dev/null 2>&1 && \

View File

@ -16,6 +16,7 @@ python3 ./test.py -f insert/nchar.py
python3 ./test.py -f insert/nchar-unicode.py
python3 ./test.py -f insert/multi.py
python3 ./test.py -f insert/randomNullCommit.py
python3 insert/retentionpolicy.py
python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py
@ -154,6 +155,7 @@ python3 ./test.py -f stream/new.py
python3 ./test.py -f stream/stream1.py
python3 ./test.py -f stream/stream2.py
python3 ./test.py -f stream/parser.py
python3 ./test.py -f stream/history.py
#alter table
python3 ./test.py -f alter/alter_table_crash.py
@ -192,3 +194,8 @@ python3 test.py -f query/queryInterval.py
# tools
python3 test.py -f tools/taosdemo.py
# subscribe
python3 test.py -f subscribe/singlemeter.py
#python3 test.py -f subscribe/stability.py
python3 test.py -f subscribe/supertable.py

View File

@ -0,0 +1,112 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
import datetime
sys.path.insert(0, os.getcwd())
import taos
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestRetetion:
def init(self):
self.queryRows=0
tdLog.debug("start to execute %s" % __file__)
tdLog.info("prepare cluster")
tdDnodes.init("")
tdDnodes.setTestCluster(False)
tdDnodes.setValgrind(False)
tdDnodes.stopAll()
tdDnodes.deploy(1)
tdDnodes.start(1)
print(tdDnodes.getDnodesRootDir())
self.conn = taos.connect(config=tdDnodes.getSimCfgPath())
tdSql.init(self.conn.cursor())
tdSql.execute('reset query cache')
def checkRows(self, expectRows,sql):
if self.queryRows == expectRows:
tdLog.info("sql:%s, queryRows:%d == expect:%d" % (sql, self.queryRows, expectRows))
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, self.queryRows, expectRows)
os.system("timedatectl set-ntp true")
tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
def run(self):
tdLog.info("=============== step1")
tdSql.execute('create database test keep 3 days 1;')
tdSql.execute('use test;')
tdSql.execute('create table test(ts timestamp,i int);')
cmd = 'insert into test values(now-2d,11)(now-1d,11)(now,11)(now+1d,11);'
tdLog.info(cmd)
tdSql.execute(cmd)
tdSql.query('select * from test')
tdSql.checkRows(4)
tdLog.info("=============== step2")
tdDnodes.stop(1)
os.system("timedatectl set-ntp false")
os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
tdDnodes.start(1)
cmd = 'insert into test values(now,11);'
tdLog.info(cmd)
tdSql.execute(cmd)
queryRows=tdSql.query('select * from test')
if queryRows==4:
tdSql.checkRows(4)
return 0
else:
tdSql.checkRows(5)
tdLog.info("=============== step3")
tdDnodes.stop(1)
os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
tdDnodes.start(1)
cmd = 'insert into test values(now-1d,11);'
tdLog.info(cmd)
tdSql.execute(cmd)
queryRows=tdSql.query('select * from test')
tdSql.checkRows(6)
tdLog.info("=============== step4")
tdDnodes.stop(1)
tdDnodes.start(1)
cmd = 'insert into test values(now,11);'
tdLog.info(cmd)
tdSql.execute(cmd)
tdSql.query('select * from test')
tdSql.checkRows(7)
tdLog.info("=============== step5")
tdDnodes.stop(1)
tdDnodes.start(1)
cmd='select * from test where ts > now-1d'
queryRows=tdSql.query('select * from test where ts > now-1d')
self.checkRows(1,cmd)
def stop(self):
os.system("timedatectl set-ntp true")
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
clients = TDTestRetetion()
clients.init()
clients.run()
clients.stop()

View File

@ -0,0 +1,63 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
tdSql.execute("create table cars(ts timestamp, s int) tags(id int)")
tdSql.execute("create table car0 using cars tags(0)")
tdSql.execute("create table car1 using cars tags(1)")
tdSql.execute("create table car2 using cars tags(2)")
tdSql.execute("create table car3 using cars tags(3)")
tdSql.execute("create table car4 using cars tags(4)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:00.103', 1)")
tdSql.execute("insert into car1 values('2019-01-01 00:00:00.234', 1)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:01.012', 1)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:02.003', 1)")
tdSql.execute("insert into car2 values('2019-01-01 00:00:02.328', 1)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:03.139', 1)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:04.348', 1)")
tdSql.execute("insert into car0 values('2019-01-01 00:00:05.783', 1)")
tdSql.execute("insert into car1 values('2019-01-01 00:00:01.893', 1)")
tdSql.execute("insert into car1 values('2019-01-01 00:00:02.712', 1)")
tdSql.execute("insert into car1 values('2019-01-01 00:00:03.982', 1)")
tdSql.execute("insert into car3 values('2019-01-01 00:00:01.389', 1)")
tdSql.execute("insert into car4 values('2019-01-01 00:00:01.829', 1)")
tdSql.execute("create table strm as select count(*) from cars interval(4s)")
tdSql.waitedQuery("select * from strm", 2, 100)
tdSql.checkData(0, 1, 11)
tdSql.checkData(1, 1, 2)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -16,6 +16,7 @@ import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
@ -25,11 +26,30 @@ class TDTestCase:
self.numberOfTables = 10000
self.numberOfRecords = 100
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
tdSql.prepare()
os.system("yes | taosdemo -t %d -n %d" % (self.numberOfTables, self.numberOfRecords))
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
os.system("yes | %staosdemo -t %d -n %d" % (binPath,self.numberOfTables, self.numberOfRecords))
tdSql.execute("use test")
tdSql.query("select count(*) from meters")

View File

@ -220,3 +220,5 @@ run general/stream/table_del.sim
run general/stream/metrics_del.sim
run general/stream/table_replica1_vnoden.sim
run general/stream/metrics_replica1_vnoden.sim
run general/db/show_create_db.sim
run general/db/show_create_table.sim

View File

@ -218,7 +218,10 @@ if $data12_db != 1 then
return -1
endi
sql_error alter database db wal 2
sql alter database db wal 1
sql alter database db wal 2
sql alter database db wal 1
sql alter database db wal 2
sql_error alter database db wal 0
sql_error alter database db wal 3
sql_error alter database db wal 4
@ -226,11 +229,13 @@ sql_error alter database db wal -1
sql_error alter database db wal 1000
print ============== step fsync
sql_error alter database db fsync 2
sql_error alter database db fsync 3
sql_error alter database db fsync 4
sql alter database db fsync 0
sql alter database db fsync 1
sql alter database db fsync 3600
sql alter database db fsync 18000
sql alter database db fsync 180000
sql_error alter database db fsync 180001
sql_error alter database db fsync -1
sql_error alter database db fsync 1000
print ============== step comp
sql show databases

View File

@ -0,0 +1,32 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
print =============== step2
sql create database db
sql show create database db
if $rows != 1 then
return -1
endi
print =============== step3
sql use db
sql show create database db
if $rows != 1 then
return -1
endi
if $data00 != db then
return -1
endi
sql drop database db
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,87 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
print ===============create three type table
sql create database db
sql use db
sql create table meters(ts timestamp, f binary(8)) tags(loc int, zone binary(8))
sql create table t0 using meters tags(1,'ch')
sql create table normalTbl(ts timestamp, zone binary(8))
sql use db
sql show create table meters
if $rows != 1 then
return -1
endi
print ===============check sub table
sql show create table t0
if $rows != 1 then
return -1
endi
if $data00 == 't0' then
return -1
endi
print ===============check normal table
sql show create table normalTbl
if $rows != 1 then
return -1
endi
if $data00 == 'normalTbl' then
return -1
endi
print ===============check super table
sql show create table meters
if $rows != 1 then
return -1
endi
if $data00 == 'meters' then
return -1
endi
print ===============check sub table with prefix
sql show create table db.t0
if $rows != 1 then
return -1
endi
if $data00 == 't0' then
return -1
endi
print ===============check normal table with prefix
sql show create table db.normalTbl
if $rows != 1 then
return -1
endi
if $data00 == 'normalTbl' then
return -1
endi
print ===============check super table with prefix
sql show create table db.meters
if $rows != 1 then
return -1
endi
if $data00 == 'meters' then
return -1
endi
sql drop database db
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -3,7 +3,7 @@ sleep 3000
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 0
system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode1 -c maxSQLLength -v 7340032
system sh/cfg.sh -n dnode1 -c maxSQLLength -v 340032
system sh/exec.sh -n dnode1 -s start
sleep 3000
@ -18,10 +18,22 @@ sql use d1
sql create table table_rest (ts timestamp, i int)
print sql length is 270KB
restful d1 table_rest 1591072800 10000 gzip
restful d1 table_rest 1591172800 10000 gzip
restful d1 table_rest 1591272800 10000 gzip
restful d1 table_rest 1591372800 10000 gzip
restful d1 table_rest 1591472800 10000 gzip
restful d1 table_rest 1591572800 10000 gzip
restful d1 table_rest 1591672800 10000 gzip
restful d1 table_rest 1591772800 10000 gzip
restful d1 table_rest 1591872800 10000 gzip
restful d1 table_rest 1591972800 10000 gzip
sql select * from table_rest;
print rows: $rows
if $rows != 10000 then
if $rows != 100000 then
return -1
endi
system curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'select * from d1.table_rest' 127.0.0.1:7111/rest/sql --compressed
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -308,13 +308,25 @@ sleep 2000
system sh/exec.sh -n dnode1 -s start
sql select * from wh_mt0 where c3 = 'abc' and tbname in ('test_null_filter');
sql_error select * from wh_mt0 where c3 = 'abc' and tbname in ('test_null_filter');
sql select * from wh_mt0 where c3 = '1' and tbname in ('test_null_filter');
if $row != 0 then
return -1
endi
sql select * from wh_mt0 where c3 = 'abc' and tbname in ('test_null_filter');
if $row != 0 then
sql select * from wh_mt0 where c3 = '1';
if $row == 0 then
return -1
endi
sql select * from wh_mt0 where c3 is null and tbname in ('test_null_filter');
if $rows != 10000 then
return -1
endi
sql select * from wh_mt0 where c3 is not null and tbname in ('test_null_filter');
if $rows != 0 then
return -1
endi

View File

@ -79,6 +79,7 @@ cd ../../../debug; make
./test.sh -f general/http/autocreate.sim
./test.sh -f general/http/chunked.sim
./test.sh -f general/http/gzip.sim
./test.sh -f general/http/restful.sim
./test.sh -f general/http/restful_insert.sim
./test.sh -f general/http/restful_limit.sim

View File

@ -128,6 +128,7 @@ echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG
echo "sdebugFlag 135" >> $TAOS_CFG
echo "wdebugFlag 135" >> $TAOS_CFG
echo "cqdebugFlag 135" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG
echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG

View File

@ -34,11 +34,11 @@ system sh/cfg.sh -n dnode4 -c http -v 1
return
# for crash_gen
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 10
system sh/cfg.sh -n dnode1 -c rpcMaxTime -v 101
system sh/cfg.sh -n dnode1 -c cache -v 2
system sh/cfg.sh -n dnode1 -c keep -v 36500
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c walLevel -v 1
# for windows

View File

@ -42,9 +42,11 @@ $count = 2
while $count < 102
$db = d . $count
$tb = $db . .t
$tb2 = $db . .t2
sql create database $db replica 3 cache 1 blocks 3
sql create table $tb (ts timestamp, i int)
sql insert into $tb values(now, 1)
sql create table $tb2 as select count(*) from $tb interval(10s)
$count = $count + 1
print insert into $tb values(now, 1) ==> finished
endw
@ -74,7 +76,7 @@ print ============================== step6
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
sleep 3000
sleep 10000
print ============================== step7

View File

@ -9,8 +9,9 @@ NC='\033[0m'
function runSimCaseOneByOne {
while read -r line; do
if [[ $line =~ ^run.* ]]; then
case=`echo $line | awk '{print $NF}'`
if [[ $line =~ ^./test.sh* ]]; then
case=`echo $line | grep sim$ |awk '{print $NF}'`
start_time=`date +%s`
./test.sh -f $case > /dev/null 2>&1 && \
echo -e "${GREEN}$case success${NC}" | tee -a out.log || \
@ -54,7 +55,7 @@ if [ "$2" != "python" ]; then
runSimCaseOneByOne regressionSuite.sim
elif [ "$1" == "full" ]; then
echo "### run TSIM full test ###"
runSimCaseOneByOne fullGeneralSuite.sim
runSimCaseOneByOne jenkins/basic.txt
elif [ "$1" == "smoke" ] || [ -z "$1" ]; then
echo "### run TSIM smoke test ###"
runSimCaseOneByOne basicSuite.sim