Merge branch 'develop' into feature/query

This commit is contained in:
Haojun Liao 2020-10-26 14:49:48 +08:00
commit 92eafa0719
74 changed files with 3046 additions and 1448 deletions

4
Jenkinsfile vendored
View File

@ -9,7 +9,7 @@ pipeline {
stage('Parallel test stage') {
parallel {
stage('pytest') {
agent{label 'master'}
agent{label '184'}
steps {
sh '''
date
@ -34,7 +34,7 @@ pipeline {
}
}
stage('test_b1') {
agent{label '184'}
agent{label 'master'}
steps {
sh '''
cd ${WKC}

View File

@ -61,7 +61,7 @@ The use of each configuration item is:
* **port**: This is the `http` service port which enables other application to manage rules by `restful API`.
* **database**: rules are stored in a `sqlite` database, this is the path of the database file (if the file does not exist, the alert application creates it automatically).
* **tdengine**: connection string of `TDEngine` server, note in most cases the database information should be put in a rule, thus it should NOT be included here.
* **tdengine**: connection string of `TDEngine` server, note the database name should be put in the `sql` field of a rule in most cases, thus it should NOT be included in the string.
* **log > level**: log level, could be `production` or `debug`.
* **log > path**: log output file path.
* **receivers > alertManager**: the alert application pushes alerts to `AlertManager` at this URL.

View File

@ -58,7 +58,7 @@ $ go build
* **port**:报警监测程序支持使用 `restful API` 对规则进行管理,这个参数用于配置 `http` 服务的侦听端口。
* **database**:报警监测程序将规则保存到了一个 `sqlite` 数据库中,这个参数用于指定数据库文件的路径(不需要提前创建这个文件,如果它不存在,程序会自动创建它)。
* **tdengine**`TDEngine` 的连接信息,一般来说,数据库信息应该在报警规则中指定,所以这里 **不** 应包含这一部分信息
* **tdengine**`TDEngine` 的连接字符串,一般来说,数据库名应该在报警规则的 `sql` 语句中指定,所以这个字符串中 **不** 应包含数据库名
* **log > level**:日志的记录级别,可选 `production``debug`
* **log > path**:日志文件的路径。
* **receivers > alertManager**:报警监测程序会将报警推送到 `AlertManager`,在这里指定 `AlertManager` 的接收地址。

View File

@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.0.5.1")
SET(TD_VER_NUMBER "2.0.6.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)

View File

@ -51,8 +51,8 @@ INTERVAL(1M)</code></pre>
<li><p>mseconds查询数据库更新的时间间隔单位为毫秒。一般设置为1000毫秒。返回值为指向TDengine_SUB 结构的指针,如果返回为空,表示失败。</p></li>
</ul><li><p><code>TAOS_ROW taos_consume(TAOS_SUB *tsub)</code>
</p><p>该函数用来获取订阅的结果用户应用程序将其置于一个无限循环语句。如果数据库有新记录到达该API将返回该最新的记录。如果没有新的记录该API将阻塞。如果返回值为空说明系统出错。参数说明</p></li><ul><li><p>tsubtaos_subscribe的结构体指针。</p></li></ul><li><p><code>void taos_unsubscribe(TAOS_SUB *tsub)</code></p><p>取消订阅。应用程序退出时,务必调用该函数以避免资源泄露。</p></li>
<li><p><code>int taos_num_subfields(TAOS_SUB *tsub)</code></p><p>获取返回的一行记录中数据包含多少列。</p></li>
<li><p><code>TAOS_FIELD *taos_fetch_subfields(TAOS_SUB *tsub)</code></p><p>获取每列数据的属性数据类型、名字、长度与taos_num_subfileds配合使用可解析返回的每行数据。</p></li></ul>
<li><p><code>int taos_num_fields(TAOS_SUB *tsub)</code></p><p>获取返回的一行记录中数据包含多少列。</p></li>
<li><p><code>TAOS_FIELD *taos_fetch_fields(TAOS_SUB *tsub)</code></p><p>获取每列数据的属性数据类型、名字、长度与taos_num_subfileds配合使用可解析返回的每行数据。</p></li></ul>
<p>示例代码:请看安装包中的的示范程序</p>
<a class='anchor' id='缓存-(Cache)'></a><h2>缓存 (Cache)</h2>
<p>TDengine采用时间驱动缓存管理策略First-In-First-OutFIFO又称为写驱动的缓存管理机制。这种策略有别于读驱动的数据缓存模式Least-Recent-UseLRU直接将最近写入的数据保存在系统的缓存中。当缓存达到临界值的时候将最早的数据批量写入磁盘。一般意义上来说对于物联网数据的使用用户最为关心最近产生的数据即当前状态。TDengine充分利用了这一特性将最近到达的当前状态数据保存在缓存中。</p>

View File

@ -64,9 +64,9 @@
<p>该API用来获取最新消息应用程序一般会将其置于一个无限循环语句中。其中参数tsub是taos_subscribe的返回值。如果数据库有新的记录该API将返回返回参数是一行记录。如果没有新的记录该API将阻塞。如果返回值为空说明系统出错需要检查系统是否还在正常运行。</p></li>
<li><p><code>void taos_unsubscribe(TAOS_SUB *tsub)</code></p>
<p>该API用于取消订阅参数tsub是taos_subscribe的返回值。应用程序退出时需要调用该API否则有资源泄露。</p></li>
<li><p><code>int taos_num_subfields(TAOS_SUB *tsub)</code></p>
<li><p><code>int taos_num_fields(TAOS_SUB *tsub)</code></p>
<p>该API用来获取返回的一排数据中数据的列数</p></li>
<li><p><code>TAOS_FIELD *taos_fetch_subfields(TAOS_RES *res)</code></p>
<li><p><code>TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)</code></p>
<p>该API用来获取每列数据的属性数据类型、名字、字节数与taos_num_subfileds配合使用可用来解析返回的一排数据。</p></li>
</ul>
<a class='anchor' id='Java-Connector'></a><h2>Java Connector</h2>
@ -259,4 +259,4 @@ conn.close()
_ "taosSql"
)</code></pre>
<p>taosSql驱动包内采用cgo模式调用了TDengine的C/C++同步接口与TDengine进行交互因此在数据库操作执行完成之前客户端应用将处于阻塞状态。单个数据库连接在同一时刻只能有一个线程调用API。客户应用可以建立多个连接进行多线程的数据写入或查询处理。</p>
<p>更多使用的细节,请参考下载目录中的示例源码。</p><a href='../index.html'>回去</a></section></main></div><?php include($s.'/footer.php'); ?><script>$('pre').addClass('prettyprint linenums');PR.prettyPrint()</script><script src='lib/docs/liner.js'></script></body></html>
<p>更多使用的细节,请参考下载目录中的示例源码。</p><a href='../index.html'>回去</a></section></main></div><?php include($s.'/footer.php'); ?><script>$('pre').addClass('prettyprint linenums');PR.prettyPrint()</script><script src='lib/docs/liner.js'></script></body></html>

View File

@ -72,9 +72,9 @@ The API is used to start a subscription session by given a handle. The parameter
The API used to get the new data from a TDengine server. It should be put in an infinite loop. The parameter <em>tsub</em> is the handle returned by <em>taos_subscribe</em>. If new data are updated, the API will return a row of the result. Otherwise, the API is blocked until new data arrives. If <em>NULL</em> pointer is returned, it means an error occurs.</p></li>
<li><p><code>void taos_unsubscribe(TAOS_SUB *tsub)</code>
Stop a subscription session by the handle returned by <em>taos_subscribe</em>.</p></li>
<li><p><code>int taos_num_subfields(TAOS_SUB *tsub)</code>
<li><p><code>int taos_num_fields(TAOS_SUB *tsub)</code>
The API used to get the number of fields in a row.</p></li>
<li><p><code>TAOS_FIELD *taos_fetch_subfields(TAOS_RES *res)</code>
<li><p><code>TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)</code>
The API used to get the description of each column.</p></li>
</ul>
<a class='anchor' id='Java-Connector'></a><h2>Java Connector</h2>
@ -351,4 +351,4 @@ promise2.then(function(result) {
})</code></pre>
<h3>Example</h3>
<p>An example of using the NodeJS connector to create a table with weather data and create and execute queries can be found <a href="https://github.com/taosdata/TDengine/tree/master/tests/examples/nodejs/node-example.js">here</a> (The preferred method for using the connector)</p>
<p>An example of using the NodeJS connector to achieve the same things but without all the object wrappers that wrap around the data returned to achieve higher functionality can be found <a href="https://github.com/taosdata/TDengine/tree/master/tests/examples/nodejs/node-example-raw.js">here</a></p><a href='../index.html'>Back</a></section></main></div><?php include($s.'/footer.php'); ?><script>$('pre').addClass('prettyprint linenums');PR.prettyPrint()</script><script src='lib/docs/liner.js'></script></body></html>
<p>An example of using the NodeJS connector to achieve the same things but without all the object wrappers that wrap around the data returned to achieve higher functionality can be found <a href="https://github.com/taosdata/TDengine/tree/master/tests/examples/nodejs/node-example-raw.js">here</a></p><a href='../index.html'>Back</a></section></main></div><?php include($s.'/footer.php'); ?><script>$('pre').addClass('prettyprint linenums');PR.prettyPrint()</script><script src='lib/docs/liner.js'></script></body></html>

View File

@ -228,7 +228,7 @@ TDengine采用数据驱动的方式让缓存中的数据写入硬盘进行持久
为充分利用时序数据特点TDengine将一个vnode保存在持久化存储的数据切分成多个文件每个文件只保存固定天数的数据这个天数由系统配置参数days决定。切分成多个文件后给定查询的起止日期无需任何索引就可以立即定位需要打开哪些数据文件大大加快读取速度。
对于采集的数据一般有保留时长这个时长由系统配置参数keep决定。超过这个设置天数的数据文件将被系统自动删除,释放存储空间。
对于采集的数据一般有保留时长这个时长由系统配置参数keep决定。超过这个设置天数的数据文件将被系统自动删除释放存储空间。
给定days与keep两个参数一个vnode总的数据文件数为keep/days。总的数据文件个数不宜过大也不宜过小。10到100以内合适。基于这个原则可以设置合理的days。 目前的版本参数keep可以修改但对于参数days一但设置后不可修改。

View File

@ -61,7 +61,7 @@ vnode与其子模块是通过API直接调用而不是通过消息队列传递
mnode是整个系统的大脑负责整个系统的资源调度负责meta data的管理与存储。
一个运行的系统里只有一个mnode但它有多个副本由系统配置参数numOfMpeers控制。这些副本分布在不同的dnode里目的是保证系统的高可靠运行。副本之间的数据复制是采用同步而非异步的方式以确保数据的一致性确保数据不会丢失。这些副本会自动选举一个Master其他副本是slave。所有数据更新类的操作都只能在master上进行而查询类的可以在slave节点上进行。代码实现上同步模块与vnode共享但mnode被分配一个特殊的vgroup ID: 1而且quorum大于1。整个集群系统是由多个dnode组成的运行的mnode的副本数不可能超过dnode的个数但不会超过配置的副本数。如果某个mnode副本宕机一段时间只要超过半数的mnode副本仍在运行运行的mnode会自动根据整个系统的资源情况在其他dnode里再启动一个mnode, 以保证运行的副本数。
一个运行的系统里只有一个mnode但它有多个副本由系统配置参数numOfMnodes控制。这些副本分布在不同的dnode里目的是保证系统的高可靠运行。副本之间的数据复制是采用同步而非异步的方式以确保数据的一致性确保数据不会丢失。这些副本会自动选举一个Master其他副本是slave。所有数据更新类的操作都只能在master上进行而查询类的可以在slave节点上进行。代码实现上同步模块与vnode共享但mnode被分配一个特殊的vgroup ID: 1而且quorum大于1。整个集群系统是由多个dnode组成的运行的mnode的副本数不可能超过dnode的个数但不会超过配置的副本数。如果某个mnode副本宕机一段时间只要超过半数的mnode副本仍在运行运行的mnode会自动根据整个系统的资源情况在其他dnode里再启动一个mnode, 以保证运行的副本数。
各个dnode通过信息交换保存有mnode各个副本的End Point列表并向其中的master节点定时间隔由系统配置参数statusInterval控制发送status消息消息体里包含该dnode的CPU、内存、剩余存储空间、vnode个数以及各个vnode的状态(存储空间、原始数据大小、记录条数、角色等。这样mnode就了解整个系统的资源情况如果用户创建新的表就可以决定需要在哪个dnode创建如果增加或删除dnode, 或者监测到某dnode数据过热、或离线太长就可以决定需要挪动那些vnode以实现负载均衡。

View File

@ -123,7 +123,7 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp -r ${examples_dir}/R ${install_dir}/examples
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/R/command.txt
cp -r ${examples_dir}/go ${install_dir}/examples
sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/src/taosapp/taosapp.go
sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/taosdemo.go
fi
# Copy driver
mkdir -p ${install_dir}/driver

View File

@ -146,7 +146,7 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
cp -r ${examples_dir}/R ${install_dir}/examples
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/R/command.txt
cp -r ${examples_dir}/go ${install_dir}/examples
sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/src/taosapp/taosapp.go
sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/taosdemo.go
fi
# Copy driver
mkdir -p ${install_dir}/driver

View File

@ -10,6 +10,7 @@ data_dir="/var/lib/taos"
log_dir="/var/log/taos"
data_link_dir="/usr/local/taos/data"
log_link_dir="/usr/local/taos/log"
install_main_dir="/usr/local/taos"
# static directory
cfg_dir="/usr/local/taos/cfg"
@ -134,6 +135,29 @@ function install_config() {
else
break
fi
done
# user email
#EMAIL_PATTERN='^[A-Za-z0-9\u4e00-\u9fa5]+@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)+$'
#EMAIL_PATTERN='^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$'
#EMAIL_PATTERN="^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$"
echo
echo -e -n "${GREEN}Enter your email address for priority support or enter empty to skip${NC}: "
read emailAddr
while true; do
if [ ! -z "$emailAddr" ]; then
# check the format of the emailAddr
#if [[ "$emailAddr" =~ $EMAIL_PATTERN ]]; then
# Write the email address to temp file
email_file="${install_main_dir}/email"
${csudo} bash -c "echo $emailAddr > ${email_file}"
break
#else
# read -p "Please enter the correct email address: " emailAddr
#fi
else
break
fi
done
}

View File

@ -39,7 +39,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql);
void **doSetResultRowData(SSqlObj *pSql, bool finalResult);
TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult);
#ifdef __cplusplus
}

View File

@ -309,7 +309,7 @@ typedef struct {
int32_t numOfGroups;
SResRec * pGroupRec;
char * data;
void ** tsrow;
TAOS_ROW tsrow;
int32_t* length; // length for each field for current row
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex * pColumnIndex;
@ -450,9 +450,8 @@ void tscCloseTscObj(STscObj *pObj);
// todo move to taos? or create a new file: taos_internal.h
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos);
void *param, TAOS **taos);
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, TAOS_RES** res);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen);
@ -483,11 +482,11 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
pData = pInfo->pSqlExpr->param[1].pz;
pRes->length[columnIndex] = pInfo->pSqlExpr->param[1].nLen;
pRes->tsrow[columnIndex] = (pInfo->pSqlExpr->param[1].nType == TSDB_DATA_TYPE_NULL) ? NULL : pData;
pRes->tsrow[columnIndex] = (pInfo->pSqlExpr->param[1].nType == TSDB_DATA_TYPE_NULL) ? NULL : (unsigned char*)pData;
} else {
assert(bytes == tDataTypeDesc[type].nSize);
pRes->tsrow[columnIndex] = isNull(pData, type) ? NULL : &pInfo->pSqlExpr->param[1].i64Key;
pRes->tsrow[columnIndex] = isNull(pData, type) ? NULL : (unsigned char*)&pInfo->pSqlExpr->param[1].i64Key;
pRes->length[columnIndex] = bytes;
}
} else {
@ -495,7 +494,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
int32_t realLen = varDataLen(pData);
assert(realLen <= bytes - VARSTR_HEADER_SIZE);
pRes->tsrow[columnIndex] = (isNull(pData, type)) ? NULL : ((tstr *)pData)->data;
pRes->tsrow[columnIndex] = (isNull(pData, type)) ? NULL : (unsigned char*)((tstr *)pData)->data;
if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
*(pData + realLen + VARSTR_HEADER_SIZE) = 0;
}
@ -504,7 +503,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
} else {
assert(bytes == tDataTypeDesc[type].nSize);
pRes->tsrow[columnIndex] = isNull(pData, type) ? NULL : pData;
pRes->tsrow[columnIndex] = isNull(pData, type) ? NULL : (unsigned char*)pData;
pRes->length[columnIndex] = bytes;
}
}

View File

@ -296,7 +296,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
return tscSetValueToResObj(pSql, rowLen);
}
static int32_t tscGetNthFieldResult(TAOS_ROW row, TAOS_FIELD* fields, int *lengths, int idx, char *result) {
const char *val = row[idx];
const char *val = (const char*)row[idx];
if (val == NULL) {
sprintf(result, "%s", TSDB_DATA_NULL_STR);
return -1;

View File

@ -20,8 +20,10 @@
#include "tutil.h"
#include "taosmsg.h"
#include "taos.h"
void tscSaveSlowQueryFp(void *handle, void *tmrId);
void *tscSlowQueryConn = NULL;
TAOS *tscSlowQueryConn = NULL;
bool tscSlowQueryConnInitialized = false;
void tscInitConnCb(void *param, TAOS_RES *result, int code) {

View File

@ -1380,7 +1380,7 @@ static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
pRes->tsrow[i] = ((char*) pRes->data + offset * pRes->numOfRows);
pRes->tsrow[i] = (unsigned char*)((char*) pRes->data + offset * pRes->numOfRows);
}
return 0;

View File

@ -51,8 +51,8 @@ static bool validPassword(const char* passwd) {
return validImpl(passwd, TSDB_PASSWORD_LEN - 1);
}
SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, const char *auth, const char *db,
uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, const char *auth, const char *db,
uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, TAOS **taos) {
taos_init();
if (!validUserName(user)) {
@ -243,16 +243,19 @@ static void asyncConnCallback(void *param, TAOS_RES *tres, int code) {
}
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) {
SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, taos);
void *param, TAOS **taos) {
STscObj *pObj = NULL;
SSqlObj *pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, (void **)&pObj);
if (pSql == NULL) {
return NULL;
}
if (taos) *taos = pObj;
pSql->fetchFp = fp;
pSql->res.code = tscProcessSql(pSql);
tscDebug("%p DB async connection is opening", taos);
return taos;
return pObj;
}
void taos_close(TAOS *taos) {
@ -897,7 +900,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
if (pSql->sqlstr == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql);
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pSql), pObj);
taosTFree(pSql);
return pRes->code;
}
@ -922,7 +925,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
}
if (code != TSDB_CODE_SUCCESS) {
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(taos), pObj);
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(pSql), pObj);
}
taos_free_result(pSql);
@ -1066,7 +1069,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
tscDoQuery(pSql);
tscDebug("%p load multi table meta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
tscDebug("%p load multi table meta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(pSql), pObj);
if ((code = pRes->code) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
}

View File

@ -2260,7 +2260,7 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
int32_t length = taosUcs4ToMbs(pRes->tsrow[columnIndex], pRes->length[columnIndex], pRes->buffer[columnIndex]);
if ( length >= 0 ) {
pRes->tsrow[columnIndex] = pRes->buffer[columnIndex];
pRes->tsrow[columnIndex] = (unsigned char*)pRes->buffer[columnIndex];
pRes->length[columnIndex] = length;
} else {
tscError("%p charset:%s to %s. val:%s convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)pRes->tsrow[columnIndex]);
@ -2288,7 +2288,7 @@ static char *getArithemicInputSrc(void *param, const char *name, int32_t colId)
return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
}
void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
@ -2341,7 +2341,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup,
TSDB_ORDER_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i];
pRes->tsrow[i] = (unsigned char*)pRes->buffer[i];
}
}

View File

@ -704,16 +704,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd;
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, 0);
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
size_t total = taosArrayGetSize(pTableDataBlockList);
for (int32_t i = 0; i < total; ++i) {
pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
STableDataBlocks* dataBuf = NULL;
int32_t ret =

@ -1 +1 @@
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b

View File

@ -97,7 +97,7 @@ void dnodeCleanupModules() {
}
}
if (tsModule[TSDB_MOD_MNODE].enable && tsModule[TSDB_MOD_MNODE].cleanUpFp) {
if (tsModule[TSDB_MOD_MNODE].cleanUpFp) {
(*tsModule[TSDB_MOD_MNODE].cleanUpFp)();
}
}

View File

@ -190,6 +190,7 @@ void dnodeFreeVnodeWqueue(void *wqueue) {
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
SWriteMsg *pWrite = (SWriteMsg *)param;
if (pWrite == NULL) return;
if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);

View File

@ -22,12 +22,12 @@
extern "C" {
#endif
typedef void TAOS;
typedef void** TAOS_ROW;
typedef void TAOS_RES;
typedef void TAOS_SUB;
typedef void TAOS_STREAM;
typedef void TAOS_STMT;
typedef void TAOS;
typedef void TAOS_STMT;
typedef void TAOS_RES;
typedef void TAOS_STREAM;
typedef void TAOS_SUB;
typedef void **TAOS_ROW;
// Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes

View File

@ -56,6 +56,7 @@ void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
@ -65,6 +66,7 @@ int32_t vnodeInitResources();
void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
int32_t vnodeCheckRead(void *pVnode);
#ifdef __cplusplus
}

View File

@ -60,7 +60,7 @@ typedef struct SShellArguments {
extern void shellParseArgument(int argc, char* argv[], SShellArguments* arguments);
extern TAOS* shellInit(SShellArguments* args);
extern void* shellLoopQuery(void* arg);
extern void taos_error(TAOS* con);
extern void taos_error(TAOS_RES* tres);
extern int regex_match(const char* s, const char* reg, int cflags);
void shellReadCommand(TAOS* con, char command[]);
int32_t shellRunCommand(TAOS* con, char* command);
@ -72,7 +72,7 @@ void source_dir(TAOS* con, SShellArguments* args);
void get_history_path(char* history);
void cleanup_handler(void* arg);
void exitShell();
int shellDumpResult(TAOS* con, char* fname, int* error_no, bool printMode);
int shellDumpResult(TAOS_RES* con, char* fname, int* error_no, bool printMode);
void shellGetGrantInfo(void *con);
int isCommentLine(char *line);

View File

@ -493,7 +493,7 @@ static int dumpResultToFile(const char* fname, TAOS_RES* tres) {
if (i > 0) {
fputc(',', fp);
}
dumpFieldToFile(fp, row[i], fields +i, length[i], precision);
dumpFieldToFile(fp, (const char*)row[i], fields +i, length[i], precision);
}
fputc('\n', fp);
@ -619,7 +619,7 @@ static int verticalPrintResult(TAOS_RES* tres) {
int padding = (int)(maxColNameLen - strlen(field->name));
printf("%*.s%s: ", padding, " ", field->name);
printField(row[i], field, 0, length[i], precision);
printField((const char*)row[i], field, 0, length[i], precision);
putchar('\n');
}
@ -720,7 +720,7 @@ static int horizontalPrintResult(TAOS_RES* tres) {
int32_t* length = taos_fetch_lengths(tres);
for (int i = 0; i < num_fields; i++) {
putchar(' ');
printField(row[i], fields + i, width[i], length[i], precision);
printField((const char*)row[i], fields + i, width[i], length[i], precision);
putchar(' ');
putchar('|');
}

View File

@ -204,7 +204,7 @@ static void shellSourceFile(TAOS *con, char *fptr) {
int32_t code = taos_errno(pSql);
if (code != 0) {
fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo);
fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(pSql), fname, lineNo);
}
/* free local resouce: allocated memory/metric-meta refcnt */
@ -243,7 +243,7 @@ static void shellRunImportThreads(SShellArguments* args)
pThread->totalThreads = args->threadNum;
pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsDnodeShellPort);
if (pThread->taos == NULL) {
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taos));
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, "null taos"/*taos_errstr(pThread->taos)*/);
exit(0);
}

View File

@ -90,11 +90,12 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
}
} else if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
if (strlen(argv[++i]) >= TSDB_FILENAME_LEN) {
fprintf(stderr, "config file path: %s overflow max len %d\n", argv[i], TSDB_FILENAME_LEN - 1);
char *tmp = argv[++i];
if (strlen(tmp) >= TSDB_FILENAME_LEN) {
fprintf(stderr, "config file path: %s overflow max len %d\n", tmp, TSDB_FILENAME_LEN - 1);
exit(EXIT_FAILURE);
}
strcpy(configDir, argv[++i]);
strcpy(configDir, tmp);
} else {
fprintf(stderr, "Option -c requires an argument\n");
exit(EXIT_FAILURE);

View File

@ -123,16 +123,18 @@ int32_t mnodeInitSystem() {
}
void mnodeCleanupSystem() {
mInfo("starting to clean up mnode");
tsMgmtIsRunning = false;
if (tsMgmtIsRunning) {
mInfo("starting to clean up mnode");
tsMgmtIsRunning = false;
dnodeFreeMnodeWqueue();
dnodeFreeMnodeRqueue();
dnodeFreeMnodePqueue();
mnodeCleanupTimer();
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
mInfo("mnode is cleaned up");
dnodeFreeMnodeWqueue();
dnodeFreeMnodeRqueue();
dnodeFreeMnodePqueue();
mnodeCleanupTimer();
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
mInfo("mnode is cleaned up");
}
}
void mnodeStopSystem() {

View File

@ -122,9 +122,9 @@ typedef struct {
} HttpDecodeMethod;
typedef struct {
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result);
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result);
void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd);
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows);
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int numOfRows);
void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows);
void (*initJsonFp)(struct HttpContext *pContext);
void (*cleanJsonFp)(struct HttpContext *pContext);
@ -148,7 +148,7 @@ typedef struct HttpContext {
char ipstr[22];
char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[TSDB_PASSWORD_LEN];
void * taos;
TAOS * taos;
void * ppContext;
HttpSession *session;
z_stream gzipStream;

View File

@ -217,7 +217,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
httpJsonStringForTransMean(jsonBuf, row[i], fields[i].bytes);
httpJsonStringForTransMean(jsonBuf, (char*)row[i], fields[i].bytes);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (precision == TSDB_TIME_PRECISION_MILLI) { //ms

View File

@ -131,7 +131,7 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
httpJsonStringForTransMean(jsonBuf, row[i], length[i]);
httpJsonStringForTransMean(jsonBuf, (char*)row[i], length[i]);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (timestampFormat == REST_TIMESTAMP_FMT_LOCAL_STRING) {
@ -195,4 +195,4 @@ void restStopSqlJson(HttpContext *pContext, HttpSqlCmd *cmd) {
httpJsonToken(jsonBuf, JsonObjEnd);
httpWriteJsonBufEnd(jsonBuf);
}
}

View File

@ -27,8 +27,6 @@
#include "httpSession.h"
#include "httpQueue.h"
void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos);
void httpProcessMultiSql(HttpContext *pContext);
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows);

View File

@ -6870,7 +6870,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
*buildRes = false;
if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
qDebug("QInfo:%p query is killed, code:0x%08x", pQInfo, pQInfo->code);
return pQInfo->code;
}
@ -7290,13 +7290,20 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
void** qAcquireQInfo(void* pMgmt, uint64_t _key) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) {
if (pQueryMgmt->closed) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
}
if (pQueryMgmt->qinfoPool == NULL) {
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
return NULL;
}
TSDB_CACHE_PTR_TYPE key = (TSDB_CACHE_PTR_TYPE)_key;
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(TSDB_CACHE_PTR_TYPE));
if (handle == NULL || *handle == NULL) {
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
return NULL;
} else {
return handle;

View File

@ -344,8 +344,6 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t
return 0;
}
assert(colIdx == 0);
if (tsOrder == TSDB_ORDER_DESC) { // primary column desc order
return (f1 < f2) ? 1 : -1;
} else { // asc

View File

@ -174,12 +174,15 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true;
eventfd_t fd = -1;
// save thread into local variable since pThreadObj is freed when thread exits
pthread_t thread = pThreadObj->thread;
if (taosComparePthread(pThreadObj->thread, pthread_self())) {
pthread_detach(pthread_self());
return;
}
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
if (taosCheckPthreadValid(pThreadObj->thread)) {
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
@ -196,8 +199,9 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
}
}
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
pthread_join(pThreadObj->thread, NULL);
// at this step, pThreadObj has already been released
if (taosCheckPthreadValid(thread)) {
pthread_join(thread, NULL);
}
if (fd != -1) taosCloseSocket(fd);

View File

@ -5,14 +5,14 @@ INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX)
LIST(REMOVE_ITEM SRC ./src/tarbitrator.c)
LIST(REMOVE_ITEM SRC src/tarbitrator.c)
ADD_LIBRARY(sync ${SRC})
TARGET_LINK_LIBRARIES(sync tutil pthread common)
LIST(APPEND BIN_SRC ./src/tarbitrator.c)
LIST(APPEND BIN_SRC ./src/taosTcpPool.c)
LIST(APPEND BIN_SRC src/tarbitrator.c)
LIST(APPEND BIN_SRC src/taosTcpPool.c)
ADD_EXECUTABLE(tarbitrator ${BIN_SRC})
TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil)
ADD_SUBDIRECTORY(test)
#ADD_SUBDIRECTORY(test)
ENDIF ()

View File

@ -465,9 +465,10 @@ void *vnodeAcquireRqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_APP_NOT_READY;
vInfo("vgId:%d, status is in reset", vgId);
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
vInfo("vgId:%d, can not provide read service, status is %s", vgId, vnodeStatus[pVnode->status]);
vnodeRelease(pVnode);
return NULL;
}
@ -479,13 +480,14 @@ void *vnodeAcquireWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_APP_NOT_READY;
vInfo("vgId:%d, status is in reset", vgId);
int32_t code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
vInfo("vgId:%d, can not provide write service, status is %s", vgId, vnodeStatus[pVnode->status]);
vnodeRelease(pVnode);
return NULL;
}
return pVnode->wqueue;
}

View File

@ -39,7 +39,13 @@ void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
}
static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
//
// After the fetch request enters the vnode queue, if the vnode cannot provide services, the process function are
// still required, or there will be a deadlock, so we dont do any check here, but put the check codes before the
// request enters the queue
//
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int msgType = pReadMsg->rpcMsg.msgType;
if (vnodeProcessReadMsgFp[msgType] == NULL) {
@ -47,53 +53,35 @@ static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
}
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
}
int32_t vnodeCheckRead(void *param) {
SVnodeObj *pVnode = param;
if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %s", pVnode->vgId, taosMsg[msgType],
vnodeStatus[pVnode->status]);
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
}
// tsdb may be in reset state
if (pVnode->tsdb == NULL) {
vDebug("vgId:%d, msgType:%s not processed, tsdb is null", pVnode->vgId, taosMsg[msgType]);
return TSDB_CODE_APP_NOT_READY;
}
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
vDebug("vgId:%d, msgType:%s not processed, vstatus is %s", pVnode->vgId, taosMsg[msgType],
vnodeStatus[pVnode->status]);
vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
}
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType],
pVnode->syncCfg.replica, syncRole[pVnode->role]);
vDebug("vgId:%d, replica:%d role:%s, recCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica,
syncRole[pVnode->role], pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
}
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
return TSDB_CODE_SUCCESS;
}
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
int32_t vnodeProcessRead(void *param, SReadMsg *pRead) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int32_t code = vnodeProcessReadImp(pVnode, pRead);
if (code == TSDB_CODE_APP_NOT_READY && pRead->rpcMsg.msgType == TSDB_MSG_TYPE_QUERY) {
// After the fetch request enters the vnode queue
// If the vnode cannot provide services, the following operations are still required
// Or, there will be a deadlock
void **qhandle = (void **)pRead->pCont;
vError("QInfo:%p msg:%p will be killed for vstatus is %s", *qhandle, pRead, vnodeStatus[pVnode->status]);
// qKillQuery(*qhandle);
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
return TSDB_CODE_APP_NOT_READY;
} else {
return code;
}
}
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
@ -104,6 +92,8 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *a
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
return TSDB_CODE_SUCCESS;
}
/**
@ -122,8 +112,13 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle,
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) {
*freeHandle = false;
vnodePutItemIntoReadQueue(pVnode, handle, ahandle);
pRet->qhandle = *handle;
code = vnodePutItemIntoReadQueue(pVnode, handle, ahandle);
if (code != TSDB_CODE_SUCCESS) {
*freeHandle = true;
return code;
} else {
pRet->qhandle = *handle;
}
} else {
*freeHandle = true;
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
@ -226,7 +221,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle);
code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle);
if (code != TSDB_CODE_SUCCESS) {
pRsp->code = code;
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
return pRsp->code;
}
}
} else {
assert(pCont != NULL);
@ -287,16 +287,22 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset(pRet, 0, sizeof(SRspRet));
terrno = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
if (handle == NULL || (*handle) != (void *)pRetrieve->qhandle) {
if (handle == NULL) {
code = terrno;
terrno = TSDB_CODE_SUCCESS;
} else if ((*handle) != (void *)pRetrieve->qhandle) {
code = TSDB_CODE_QRY_INVALID_QHANDLE;
vDebug("vgId:%d, invalid qhandle in retrieving result, QInfo:%p", pVnode->vgId, (void *)pRetrieve->qhandle);
}
if (code != TSDB_CODE_SUCCESS) {
vDebug("vgId:%d, invalid handle in retrieving result, code:0x%08x, QInfo:%p", pVnode->vgId, code, (void *)pRetrieve->qhandle);
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle);

View File

@ -56,15 +56,6 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
}
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, msgType:%s not processed, no write auth", pVnode->vgId, taosMsg[pHead->msgType]);
return TSDB_CODE_VND_NO_WRITE_AUTH;
}
// tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;
if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType],
@ -105,6 +96,28 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
return syncCode;
}
int32_t vnodeCheckWrite(void *param) {
SVnodeObj *pVnode = param;
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_VND_NO_WRITE_AUTH;
}
// tsdb may be in reset state
if (pVnode->tsdb == NULL) {
vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
}
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY;
}
return TSDB_CODE_SUCCESS;
}
void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
SVnodeObj *pVnode = (SVnodeObj *)param;
syncConfirmForward(pVnode->sync, version, code);

View File

@ -60,6 +60,7 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
static int walRemoveWalFiles(const char *path);
static void walProcessFsyncTimer(void *param, void *tmrId);
static void walRelease(SWal *pWal);
static int walGetMaxOldFileId(char *odir);
static void walModuleInitFunc() {
walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL");
@ -312,7 +313,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
for (index = minId; index <= maxId; ++index) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index);
terrno = walRestoreWalFile(pWal, pVnode, writeFp);
if (terrno < 0) break;
if (terrno < 0) continue;
}
}
@ -476,31 +477,26 @@ int walHandleExistingFiles(const char *path) {
int plen = strlen(walPrefix);
terrno = 0;
if (access(opath, F_OK) == 0) {
// old directory is there, it means restore process is not finished
walRemoveWalFiles(path);
} else {
// move all files to old directory
int count = 0;
while ((ent = readdir(dir)) != NULL) {
if (strncmp(ent->d_name, walPrefix, plen) == 0) {
snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name);
snprintf(nname, sizeof(nname), "%s/old/%s", path, ent->d_name);
if (taosMkDir(opath, 0755) != 0) {
wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (rename(oname, nname) < 0) {
wError("wal:%s, failed to move to new:%s", oname, nname);
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
count++;
int midx = walGetMaxOldFileId(opath);
int count = 0;
while ((ent = readdir(dir)) != NULL) {
if (strncmp(ent->d_name, walPrefix, plen) == 0) {
midx++;
snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name);
snprintf(nname, sizeof(nname), "%s/old/wal%d", path, midx);
if (taosMkDir(opath, 0755) != 0) {
wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (rename(oname, nname) < 0) {
wError("wal:%s, failed to move to new:%s", oname, nname);
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
count++;
}
wDebug("wal:%s, %d files are moved for restoration", path, count);
@ -563,4 +559,30 @@ int64_t walGetVersion(twalh param) {
if (pWal == 0) return 0;
return pWal->version;
}
static int walGetMaxOldFileId(char *odir) {
int midx = 0;
DIR * dir = NULL;
struct dirent *dp = NULL;
int plen = strlen(walPrefix);
if (access(odir, F_OK) != 0) return midx;
dir = opendir(odir);
if (dir == NULL) {
wError("failed to open directory %s since %s", odir, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
while ((dp = readdir(dir)) != NULL) {
if (strncmp(dp->d_name, walPrefix, plen) == 0) {
int idx = atol(dp->d_name + plen);
if (midx < idx) midx = idx;
}
}
closedir(dir);
return midx;
}

View File

@ -108,9 +108,9 @@ void parseArg(int argc, char *argv[]) {
}
}
void taos_error(TAOS *con) {
printf("TDengine error: %s\n", taos_errstr(con));
taos_close(con);
static void taos_error(TAOS_RES *tres, TAOS *conn) {
printf("TDengine error: %s\n", tres?taos_errstr(tres):"null result");
taos_close(conn);
exit(1);
}
@ -125,13 +125,17 @@ void writeDataImp(void *param) {
printf("Thread %d, writing sID %d, eID %d\n", pThread->threadId, pThread->sID, pThread->eID);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL)
taos_error(taos);
if (taos == NULL) {
// where to find errstr?
// taos_error(NULL, taos);
printf("TDengine error: %s\n", "failed to connect");
exit(1);
}
TAOS_RES* result = taos_query(taos, "use db");
int32_t code = taos_errno(result);
if (code != 0) {
taos_error(taos);
taos_error(result, taos);
}
taos_free_result(result);
@ -227,12 +231,17 @@ void writeData() {
taos_init();
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL) taos_error(taos);
if (taos == NULL) {
// where to find errstr?
// taos_error(NULL, taos);
printf("TDengine error: %s\n", "failed to connect");
exit(1);
}
TAOS_RES *result = taos_query(taos, "create database if not exists db");
int32_t code = taos_errno(result);
if (code != 0) {
taos_error(taos);
taos_error(result, taos);
}
taos_free_result(result);
@ -241,7 +250,7 @@ void writeData() {
"tags(devid int, devname binary(16), devgroup int)");
code = taos_errno(result);
if (code != 0) {
taos_error(taos);
taos_error(result, taos);
}
taos_free_result(result);
@ -293,8 +302,12 @@ void readDataImp(void *param)
printf("open file %s success\n", arguments.sql);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL)
taos_error(taos);
if (taos == NULL) {
// where to find errstr?
// taos_error(NULL, taos);
printf("TDengine error: %s\n", "failed to connect");
exit(1);
}
char *line = NULL;
size_t len = 0;
@ -313,7 +326,7 @@ void readDataImp(void *param)
TAOS_RES *result = taos_query(taos, line);
int32_t code = taos_errno(result);
if (code != 0) {
taos_error(taos);
taos_error(result, taos);
}
TAOS_ROW row;
@ -343,8 +356,12 @@ void readData() {
printf("---- clients: %d\n", arguments.clients);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL)
taos_error(taos);
if (taos == NULL) {
// where to find errstr?
// taos_error(NULL, taos);
printf("TDengine error: %s\n", "failed to connect");
exit(1);
}
ThreadObj *threads = calloc((size_t)arguments.clients, sizeof(ThreadObj));

View File

@ -38,7 +38,7 @@ int main(int argc, char *argv[]) {
taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", taos_errstr(taos));
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/);
exit(1);
}
printf("success to connect to server\n");
@ -48,7 +48,7 @@ int main(int argc, char *argv[]) {
result = taos_query(taos, "create database demo");
if (result == NULL) {
printf("failed to create database, reason:%s\n", taos_errstr(taos));
printf("failed to create database, reason:%s\n", "null result"/*taos_errstr(taos)*/);
exit(1);
}
printf("success to create database\n");
@ -57,7 +57,7 @@ int main(int argc, char *argv[]) {
// create table
if (taos_query(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))") == 0) {
printf("failed to create table, reason:%s\n", taos_errstr(taos));
printf("failed to create table, reason:%s\n", taos_errstr(result));
exit(1);
}
printf("success to create table\n");
@ -70,9 +70,19 @@ int main(int argc, char *argv[]) {
for (i = 0; i < 10; ++i) {
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", 1546300800000 + i * 1000, i, i, i, i*10000000, i*1.0, i*2.0, "hello");
printf("qstr: %s\n", qstr);
if (taos_query(taos, qstr)) {
printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));
// note: how do you wanna do if taos_query returns non-NULL
// if (taos_query(taos, qstr)) {
// printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));
// }
TAOS_RES *result = taos_query(taos, qstr);
if (result) {
printf("insert row: %i\n", i);
} else {
printf("failed to insert row: %i, reason:%s\n", i, "null result"/*taos_errstr(result)*/);
exit(1);
}
//sleep(1);
}
printf("success to insert rows, total %d rows\n", i);

View File

@ -308,12 +308,6 @@ extern "C" {
extern "C" {
pub fn taos_unsubscribe(tsub: *mut ::std::os::raw::c_void);
}
extern "C" {
pub fn taos_subfields_count(tsub: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_fetch_subfields(tsub: *mut ::std::os::raw::c_void) -> *mut TAOS_FIELD;
}
extern "C" {
pub fn taos_open_stream(
taos: *mut ::std::os::raw::c_void,

View File

@ -37,16 +37,16 @@ impl Subscriber {
println!("subscribed to {} user:{}, db:{}, tb:{}, time:{}, mseconds:{}",
host, username, db, table, time, mseconds);
let mut fields = taos_fetch_subfields(tsub);
let mut fields = taos_fetch_fields(tsub);
if fields.is_null() {
taos_unsubscribe(tsub);
return Err("fetch subfields error")
return Err("fetch fields error")
}
let fcount = taos_subfields_count(tsub);
let fcount = taos_field_count(tsub);
if fcount == 0 {
taos_unsubscribe(tsub);
return Err("subfields count is 0")
return Err("fields count is 0")
}
Ok(Subscriber{tsub, fields, fcount})
@ -74,4 +74,4 @@ impl Drop for Subscriber {
fn drop(&mut self) {
unsafe {taos_unsubscribe(self.tsub);}
}
}
}

3
tests/gotest/batchtest.bat Normal file → Executable file
View File

@ -7,6 +7,9 @@ set serverPort=%2
if "%severIp%"=="" (set severIp=127.0.0.1)
if "%serverPort%"=="" (set serverPort=6030)
go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.io,direct
cd case001
case001.bat %severIp% %serverPort%

3
tests/gotest/batchtest.sh Normal file → Executable file
View File

@ -13,6 +13,9 @@ if [ ! -n "$serverPort" ]; then
serverPort=6030
fi
go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.io,direct
bash ./case001/case001.sh $severIp $serverPort
#bash ./case002/case002.sh $severIp $serverPort
#bash ./case003/case003.sh $severIp $serverPort

View File

@ -0,0 +1,57 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
import time
class ClusterTestcase:
## test case 32 ##
def run(self):
nodes = Nodes()
nodes.addConfigs("maxVgroupsPerDb", "10")
nodes.addConfigs("maxTablesPerVnode", "1000")
nodes.restartAllTaosd()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(1)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
tdSql.query("show vgroups")
dnodes = []
for i in range(10):
dnodes.append(int(tdSql.getData(i, 4)))
s = set(dnodes)
if len(s) < 3:
tdLog.exit("cluster is not balanced")
tdLog.info("cluster is balanced")
nodes.removeConfigs("maxVgroupsPerDb", "10")
nodes.removeConfigs("maxTablesPerVnode", "1000")
nodes.restartAllTaosd()
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,47 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 1, 33 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
tdSql.init(ctest.conn.cursor(), False)
## Test case 1 ##
tdLog.info("Test case 1 repeat %d times" % ctest.repeat)
for i in range(ctest.repeat):
tdLog.info("Start Round %d" % (i + 1))
replica = random.randint(1,3)
ctest.createSTable(replica)
ctest.run()
tdLog.sleep(10)
tdSql.query("select count(*) from %s.%s" %(ctest.dbName, ctest.stbName))
tdSql.checkData(0, 0, ctest.numberOfRecords * ctest.numberOfTables)
tdLog.info("Round %d completed" % (i + 1))
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,51 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 7, ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
tdSql.query("show vgroups")
for i in range(10):
tdSql.checkData(i, 5, "master")
tdSql.execute("alter database %s replica 2" % ctest.dbName)
tdLog.sleep(30)
tdSql.query("show vgroups")
for i in range(10):
tdSql.checkData(i, 5, "master")
tdSql.checkData(i, 7, "slave")
tdSql.execute("alter database %s replica 3" % ctest.dbName)
tdLog.sleep(30)
tdSql.query("show vgroups")
for i in range(10):
tdSql.checkData(i, 5, "master")
tdSql.checkData(i, 7, "slave")
tdSql.checkData(i, 9, "slave")
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,202 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import sys
sys.path.insert(0, os.getcwd())
from fabric import Connection
from util.sql import *
from util.log import *
import taos
import random
import threading
import logging
class Node:
def __init__(self, index, username, hostIP, hostName, password, homeDir):
self.index = index
self.username = username
self.hostIP = hostIP
self.hostName = hostName
self.homeDir = homeDir
self.conn = Connection("{}@{}".format(username, hostName), connect_kwargs={"password": "{}".format(password)})
def startTaosd(self):
try:
self.conn.run("sudo systemctl start taosd")
except Exception as e:
print("Start Taosd error for node %d " % self.index)
logging.exception(e)
def stopTaosd(self):
try:
self.conn.run("sudo systemctl stop taosd")
except Exception as e:
print("Stop Taosd error for node %d " % self.index)
logging.exception(e)
def restartTaosd(self):
try:
self.conn.run("sudo systemctl restart taosd")
except Exception as e:
print("Stop Taosd error for node %d " % self.index)
logging.exception(e)
def removeTaosd(self):
try:
self.conn.run("rmtaos")
except Exception as e:
print("remove taosd error for node %d " % self.index)
logging.exception(e)
def installTaosd(self, packagePath):
self.conn.put(packagePath, self.homeDir)
self.conn.cd(self.homeDir)
self.conn.run("tar -zxf $(basename '%s')" % packagePath)
with self.conn.cd("TDengine-enterprise-server"):
self.conn.run("yes|./install.sh")
def configTaosd(self, taosConfigKey, taosConfigValue):
self.conn.run("sudo echo '%s %s' >> %s" % (taosConfigKey, taosConfigValue, "/etc/taos/taos.cfg"))
def removeTaosConfig(self, taosConfigKey, taosConfigValue):
self.conn.run("sudo sed -in-place -e '/%s %s/d' %s" % (taosConfigKey, taosConfigValue, "/etc/taos/taos.cfg"))
def configHosts(self, ip, name):
self.conn.run("echo '%s %s' >> %s" % (ip, name, '/etc/hosts'))
def removeData(self):
try:
self.conn.run("sudo rm -rf /var/lib/taos/*")
except Exception as e:
print("remove taosd data error for node %d " % self.index)
logging.exception(e)
def removeLog(self):
try:
self.conn.run("sudo rm -rf /var/log/taos/*")
except Exception as e:
print("remove taosd error for node %d " % self.index)
logging.exception(e)
def removeDataForMnode(self):
try:
self.conn.run("sudo rm -rf /var/lib/taos/*")
except Exception as e:
print("remove taosd error for node %d " % self.index)
logging.exception(e)
def removeDataForVnode(self, id):
try:
self.conn.run("sudo rm -rf /var/lib/taos/vnode%d/*.data" % id)
except Exception as e:
print("remove taosd error for node %d " % self.index)
logging.exception(e)
class Nodes:
def __init__(self):
self.node1 = Node(1, 'ubuntu', '192.168.1.52', 'node1', 'tbase125!', '/home/ubuntu')
self.node2 = Node(2, 'ubuntu', '192.168.1.53', 'node2', 'tbase125!', '/home/ubuntu')
self.node3 = Node(3, 'ubuntu', '192.168.1.54', 'node3', 'tbase125!', '/home/ubuntu')
def stopAllTaosd(self):
self.node1.stopTaosd()
self.node2.stopTaosd()
self.node3.stopTaosd()
def startAllTaosd(self):
self.node1.startTaosd()
self.node2.startTaosd()
self.node3.startTaosd()
def restartAllTaosd(self):
self.node1.restartTaosd()
self.node2.restartTaosd()
self.node3.restartTaosd()
def addConfigs(self, configKey, configValue):
self.node1.configTaosd(configKey, configValue)
self.node2.configTaosd(configKey, configValue)
self.node3.configTaosd(configKey, configValue)
def removeConfigs(self, configKey, configValue):
self.node1.removeTaosConfig(configKey, configValue)
self.node2.removeTaosConfig(configKey, configValue)
self.node3.removeTaosConfig(configKey, configValue)
def removeAllDataFiles(self):
self.node1.removeData()
self.node2.removeData()
self.node3.removeData()
class ClusterTest:
def __init__(self, hostName):
self.host = hostName
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taos"
self.dbName = "mytest"
self.stbName = "meters"
self.numberOfThreads = 20
self.numberOfTables = 10000
self.numberOfRecords = 1000
self.tbPrefix = "t"
self.ts = 1538548685000
self.repeat = 1
def connectDB(self):
self.conn = taos.connect(
host=self.host,
user=self.user,
password=self.password,
config=self.config)
def createSTable(self, replica):
cursor = self.conn.cursor()
tdLog.info("drop database if exists %s" % self.dbName)
cursor.execute("drop database if exists %s" % self.dbName)
tdLog.info("create database %s replica %d" % (self.dbName, replica))
cursor.execute("create database %s replica %d" % (self.dbName, replica))
tdLog.info("use %s" % self.dbName)
cursor.execute("use %s" % self.dbName)
tdLog.info("drop table if exists %s" % self.stbName)
cursor.execute("drop table if exists %s" % self.stbName)
tdLog.info("create table %s(ts timestamp, current float, voltage int, phase int) tags(id int)" % self.stbName)
cursor.execute("create table %s(ts timestamp, current float, voltage int, phase int) tags(id int)" % self.stbName)
cursor.close()
def insertData(self, threadID):
print("Thread %d: starting" % threadID)
cursor = self.conn.cursor()
tablesPerThread = int(self.numberOfTables / self.numberOfThreads)
baseTableID = tablesPerThread * threadID
for i in range (tablesPerThread):
cursor.execute("create table %s%d using %s tags(%d)" % (self.tbPrefix, baseTableID + i, self.stbName, baseTableID + i))
query = "insert into %s%d values" % (self.tbPrefix, baseTableID + i)
base = self.numberOfRecords * i
for j in range(self.numberOfRecords):
query += "(%d, %f, %d, %d)" % (self.ts + base + j, random.random(), random.randint(210, 230), random.randint(0, 10))
cursor.execute(query)
cursor.close()
print("Thread %d: finishing" % threadID)
def run(self):
threads = []
tdLog.info("Inserting data")
for i in range(self.numberOfThreads):
thread = threading.Thread(target=self.insertData, args=(i,))
threads.append(thread)
thread.start()
for i in range(self.numberOfThreads):
threads[i].join()

View File

@ -0,0 +1,53 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 20, 21, 22 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(3)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
nodes.node2.stopTaosd()
tdSql.execute("use %s" % ctest.dbName)
tdSql.query("show vgroups")
vnodeID = tdSql.getData(0, 0)
nodes.node2.removeDataForVnode(vnodeID)
nodes.node2.startTaosd()
# Wait for vnode file to recover
for i in range(10):
tdSql.query("select count(*) from t0")
tdLog.sleep(10)
for i in range(10):
tdSql.query("select count(*) from t0")
tdSql.checkData(0, 0, 1000)
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,47 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
##Cover test case 5 ##
def run(self):
# cluster environment set up
nodes = Nodes()
nodes.addConfigs("maxVgroupsPerDb", "10")
nodes.addConfigs("maxTablesPerVnode", "1000")
nodes.restartAllTaosd()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(1)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
tdSql.error("create table tt1 using %s tags(1)" % ctest.stbName)
nodes.removeConfigs("maxVgroupsPerDb", "10")
nodes.removeConfigs("maxTablesPerVnode", "1000")
nodes.restartAllTaosd()
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,75 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 7, 10 ##
def run(self):
# cluster environment set up
tdLog.info("Test case 7, 10")
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
tdSql.init(ctest.conn.cursor(), False)
nodes.node1.stopTaosd()
tdSql.query("show dnodes")
tdSql.checkRows(3)
tdSql.checkData(0, 4, "offline")
tdSql.checkData(1, 4, "ready")
tdSql.checkData(2, 4, "ready")
nodes.node1.startTaosd()
tdSql.checkRows(3)
tdSql.checkData(0, 4, "ready")
tdSql.checkData(1, 4, "ready")
tdSql.checkData(2, 4, "ready")
nodes.node2.stopTaosd()
tdSql.query("show dnodes")
tdSql.checkRows(3)
tdSql.checkData(0, 4, "ready")
tdSql.checkData(1, 4, "offline")
tdSql.checkData(2, 4, "ready")
nodes.node2.startTaosd()
tdSql.checkRows(3)
tdSql.checkData(0, 4, "ready")
tdSql.checkData(1, 4, "ready")
tdSql.checkData(2, 4, "ready")
nodes.node3.stopTaosd()
tdSql.query("show dnodes")
tdSql.checkRows(3)
tdSql.checkData(0, 4, "ready")
tdSql.checkData(1, 4, "ready")
tdSql.checkData(2, 4, "offline")
nodes.node3.startTaosd()
tdSql.checkRows(3)
tdSql.checkData(0, 4, "ready")
tdSql.checkData(1, 4, "ready")
tdSql.checkData(2, 4, "ready")
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,54 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## cover test case 6, 8, 9, 11 ##
def run(self):
# cluster environment set up
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
tdSql.init(ctest.conn.cursor(), False)
nodes.addConfigs("offlineThreshold", "10")
nodes.removeAllDataFiles()
nodes.restartAllTaosd()
nodes.node3.stopTaosd()
tdLog.sleep(10)
tdSql.query("show dnodes")
tdSql.checkRows(3)
tdSql.checkData(2, 4, "offline")
tdLog.sleep(60)
tdSql.checkRows(3)
tdSql.checkData(2, 4, "dropping")
tdLog.sleep(300)
tdSql.checkRows(2)
nodes.removeConfigs("offlineThreshold", "10")
nodes.restartAllTaosd()
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,65 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 28, 29, 30, 31 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(3)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
nodes.node2.stopTaosd()
for i in range(100):
tdSql.execute("drop table t%d" % i)
nodes.node2.startTaosd()
tdSql.query("show tables")
tdSql.checkRows(9900)
nodes.node2.stopTaosd()
for i in range(10):
tdSql.execute("create table a%d using meters tags(2)" % i)
nodes.node2.startTaosd()
tdSql.query("show tables")
tdSql.checkRows(9910)
nodes.node2.stopTaosd()
tdSql.execute("alter table meters add col col6 int")
nodes.node2.startTaosd()
nodes.node2.stopTaosd()
tdSql.execute("drop database %s" % ctest.dbName)
nodes.node2.startTaosd()
tdSql.query("show databases")
tdSql.checkRows(0)
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,54 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
import time
class ClusterTestcase:
## test case 32 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(1)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
totalTime = 0
for i in range(10):
startTime = time.time()
tdSql.query("select * from %s" % ctest.stbName)
totalTime += time.time() - startTime
print("replica 1: avarage query time for %d records: %f seconds" % (ctest.numberOfTables * ctest.numberOfRecords,totalTime / 10))
tdSql.execute("alter database %s replica 3" % ctest.dbName)
tdLog.sleep(60)
totalTime = 0
for i in range(10):
startTime = time.time()
tdSql.query("select * from %s" % ctest.stbName)
totalTime += time.time() - startTime
print("replica 3: avarage query time for %d records: %f seconds" % (ctest.numberOfTables * ctest.numberOfRecords,totalTime / 10))
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,45 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 19 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
tdSql.init(ctest.conn.cursor(), False)
tdSql.query("show databases")
count = tdSql.queryRows;
nodes.stopAllTaosd()
nodes.node1.startTaosd()
tdSql.error("show databases")
nodes.node2.startTaosd()
tdSql.error("show databases")
nodes.node3.startTaosd()
tdLog.sleep(10)
tdSql.query("show databases")
tdSql.checkRows(count)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,48 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 17, 18 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(1)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.query("show databases")
count = tdSql.queryRows;
tdSql.execute("use %s" % ctest.dbName)
tdSql.execute("alter database %s replica 3" % ctest.dbName)
nodes.node2.stopTaosd()
nodes.node3.stopTaosd()
tdSql.error("show databases")
nodes.node2.startTaosd()
tdSql.error("show databases")
nodes.node3.startTaosd()
tdSql.query("show databases")
tdSql.checkRows(count)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,50 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from clusterSetup import *
from util.sql import tdSql
from util.log import tdLog
import random
class ClusterTestcase:
## test case 24, 25, 26, 27 ##
def run(self):
nodes = Nodes()
ctest = ClusterTest(nodes.node1.hostName)
ctest.connectDB()
ctest.createSTable(1)
ctest.run()
tdSql.init(ctest.conn.cursor(), False)
tdSql.execute("use %s" % ctest.dbName)
tdSql.execute("alter database %s replica 3" % ctest.dbName)
for i in range(100):
tdSql.execute("drop table t%d" % i)
for i in range(100):
tdSql.execute("create table a%d using meters tags(1)" % i)
tdSql.execute("alter table meters add col col5 int")
tdSql.execute("alter table meters drop col col5 int")
tdSql.execute("drop database %s" % ctest.dbName)
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
ct = ClusterTestcase()
ct.run()

View File

@ -0,0 +1,12 @@
python3 basicTest.py
python3 bananceTest.py
python3 changeReplicaTest.py
python3 dataFileRecoveryTest.py
python3 fullDnodesTest.py
python3 killAndRestartDnodesTest.py
python3 offlineThresholdTest.py
python3 oneReplicaOfflineTest.py
python3 queryTimeTest.py
python3 stopAllDnodesTest.py
python3 stopTwoDnodesTest.py
python3 syncingTest.py

View File

@ -54,6 +54,7 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3:$(pwd)
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
# Now we are all let, and let's see if we can find a crash. Note we pass all params
CRASH_GEN_EXEC=crash_gen_bootstrap.py
if [[ $1 == '--valgrind' ]]; then
shift
export PYTHONMALLOC=malloc
@ -66,14 +67,14 @@ if [[ $1 == '--valgrind' ]]; then
--leak-check=yes \
--suppressions=crash_gen/valgrind_taos.supp \
$PYTHON_EXEC \
./crash_gen/crash_gen.py $@ > $VALGRIND_OUT 2> $VALGRIND_ERR
$CRASH_GEN_EXEC $@ > $VALGRIND_OUT 2> $VALGRIND_ERR
elif [[ $1 == '--helgrind' ]]; then
shift
valgrind \
--tool=helgrind \
$PYTHON_EXEC \
./crash_gen/crash_gen.py $@
$CRASH_GEN_EXEC $@
else
$PYTHON_EXEC ./crash_gen/crash_gen.py $@
$PYTHON_EXEC $CRASH_GEN_EXEC $@
fi

View File

@ -0,0 +1,130 @@
<center><h1>User's Guide to the Crash_Gen Tool</h1></center>
# Introduction
To effectively test and debug our TDengine product, we have developed a simple tool to
exercise various functions of the system in a randomized fashion, hoping to expose
maximum number of problems, hopefully without a pre-determined scenario.
# Preparation
To run this tool, please ensure the followed preparation work is done first.
1. Fetch a copy of the TDengine source code, and build it successfully in the `build/`
directory
1. Ensure that the system has Python3.8 or above properly installed. We use
Ubuntu 20.04LTS as our own development environment, and suggest you also use such
an environment if possible.
# Simple Execution
To run the tool with the simplest method, follow the steps below:
1. Open a terminal window, start the `taosd` service in the `build/` directory
(or however you prefer to start the `taosd` service)
1. Open another terminal window, go into the `tests/pytest/` directory, and
run `./crash_gen.sh -p -t 3 -s 10` (change the two parameters here as you wish)
1. Watch the output to the end and see if you get a `SUCCESS` or `FAILURE`
That's it!
# Running Clusters
This tool also makes it easy to test/verify the clustering capabilities of TDengine. You
can start a cluster quite easily with the following command:
```
$ cd tests/pytest/
$ ./crash_gen.sh -e -o 3
```
The `-e` option above tells the tool to start the service, and do not run any tests, while
the `-o 3` option tells the tool to start 3 DNodes and join them together in a cluster.
Obviously you can adjust the the number here.
## Behind the Scenes
When the tool runs a cluster, it users a number of directories, each holding the information
for a single DNode, see:
```
$ ls build/cluster*
build/cluster_dnode_0:
cfg data log
build/cluster_dnode_1:
cfg data log
build/cluster_dnode_2:
cfg data log
```
Therefore, when something goes wrong and you want to reset everything with the cluster, simple
erase all the files:
```
$ rm -rf build/cluster_dnode_*
```
## Addresses and Ports
The DNodes in the cluster all binds the the `127.0.0.1` IP address (for now anyway), and
uses port 6030 for the first DNode, and 6130 for the 2nd one, and so on.
## Testing Against a Cluster
In a separate terminal window, you can invoke the tool in client mode and test against
a cluster, such as:
```
$ ./crash_gen.sh -p -t 10 -s 100 -i 3
```
Here the `-i` option tells the tool to always create tables with 3 replicas, and run
all tests against such tables.
# Additional Features
The exhaustive features of the tool is available through the `-h` option:
```
$ ./crash_gen.sh -h
usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS] [-i MAX_REPLICAS] [-l] [-n] [-o NUM_DNODES] [-p] [-r]
[-s MAX_STEPS] [-t NUM_THREADS] [-v] [-x]
TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
---------------------------------------------------------------------
1. You build TDengine in the top level ./build directory, as described in offical docs
2. You run the server there before this script: ./build/bin/taosd -c test/cfg
optional arguments:
-h, --help show this help message and exit
-a, --auto-start-service
Automatically start/stop the TDengine service (default: false)
-b MAX_DBS, --max-dbs MAX_DBS
Maximum number of DBs to keep, set to disable dropping DB. (default: 0)
-c CONNECTOR_TYPE, --connector-type CONNECTOR_TYPE
Connector type to use: native, rest, or mixed (default: 10)
-d, --debug Turn on DEBUG mode for more logging (default: false)
-e, --run-tdengine Run TDengine service in foreground (default: false)
-g IGNORE_ERRORS, --ignore-errors IGNORE_ERRORS
Ignore error codes, comma separated, 0x supported (default: None)
-i MAX_REPLICAS, --max-replicas MAX_REPLICAS
Maximum number of replicas to use, when testing against clusters. (default: 1)
-l, --larger-data Write larger amount of data during write operations (default: false)
-n, --dynamic-db-table-names
Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)
-o NUM_DNODES, --num-dnodes NUM_DNODES
Number of Dnodes to initialize, used with -e option. (default: 1)
-p, --per-thread-db-connection
Use a single shared db connection (default: false)
-r, --record-ops Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)
-s MAX_STEPS, --max-steps MAX_STEPS
Maximum number of steps to run (default: 100)
-t NUM_THREADS, --num-threads NUM_THREADS
Number of threads to run (default: 10)
-v, --verify-data Verify data written in a number of places by reading back (default: false)
-x, --continue-on-exception
Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)
```

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,435 @@
from __future__ import annotations
import sys
import time
import threading
import requests
from requests.auth import HTTPBasicAuth
import taos
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
from .misc import Logging, CrashGenError, Helper, Dice
import os
import datetime
# from .service_manager import TdeInstance
class DbConn:
TYPE_NATIVE = "native-c"
TYPE_REST = "rest-api"
TYPE_INVALID = "invalid"
@classmethod
def create(cls, connType, dbTarget):
if connType == cls.TYPE_NATIVE:
return DbConnNative(dbTarget)
elif connType == cls.TYPE_REST:
return DbConnRest(dbTarget)
else:
raise RuntimeError(
"Unexpected connection type: {}".format(connType))
@classmethod
def createNative(cls, dbTarget) -> DbConn:
return cls.create(cls.TYPE_NATIVE, dbTarget)
@classmethod
def createRest(cls, dbTarget) -> DbConn:
return cls.create(cls.TYPE_REST, dbTarget)
def __init__(self, dbTarget):
self.isOpen = False
self._type = self.TYPE_INVALID
self._lastSql = None
self._dbTarget = dbTarget
def __repr__(self):
return "[DbConn: type={}, target={}]".format(self._type, self._dbTarget)
def getLastSql(self):
return self._lastSql
def open(self):
if (self.isOpen):
raise RuntimeError("Cannot re-open an existing DB connection")
# below implemented by child classes
self.openByType()
Logging.debug("[DB] data connection opened: {}".format(self))
self.isOpen = True
def close(self):
raise RuntimeError("Unexpected execution, should be overriden")
def queryScalar(self, sql) -> int:
return self._queryAny(sql)
def queryString(self, sql) -> str:
return self._queryAny(sql)
def _queryAny(self, sql): # actual query result as an int
if (not self.isOpen):
raise RuntimeError("Cannot query database until connection is open")
nRows = self.query(sql)
if nRows != 1:
raise taos.error.ProgrammingError(
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
(0x991 if nRows==0 else 0x992)
)
if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
return self.getQueryResult()[0][0]
def use(self, dbName):
self.execute("use {}".format(dbName))
def existsDatabase(self, dbName: str):
''' Check if a certain database exists '''
self.query("show databases")
dbs = [v[0] for v in self.getQueryResult()] # ref: https://stackoverflow.com/questions/643823/python-list-transformation
# ret2 = dbName in dbs
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
return dbName in dbs # TODO: super weird type mangling seen, once here
def hasTables(self):
return self.query("show tables") > 0
def execute(self, sql):
''' Return the number of rows affected'''
raise RuntimeError("Unexpected execution, should be overriden")
def safeExecute(self, sql):
'''Safely execute any SQL query, returning True/False upon success/failure'''
try:
self.execute(sql)
return True # ignore num of results, return success
except taos.error.ProgrammingError as err:
return False # failed, for whatever TAOS reason
# Not possile to reach here, non-TAOS exception would have been thrown
def query(self, sql) -> int: # return num rows returned
''' Return the number of rows affected'''
raise RuntimeError("Unexpected execution, should be overriden")
def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getQueryResult(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultRows(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultCols(self):
raise RuntimeError("Unexpected execution, should be overriden")
# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
class DbConnRest(DbConn):
REST_PORT_INCREMENT = 11
def __init__(self, dbTarget: DbTarget):
super().__init__(dbTarget)
self._type = self.TYPE_REST
restPort = dbTarget.port + 11
self._url = "http://{}:{}/rest/sql".format(
dbTarget.hostAddr, dbTarget.port + self.REST_PORT_INCREMENT)
self._result = None
def openByType(self): # Open connection
pass # do nothing, always open
def close(self):
if (not self.isOpen):
raise RuntimeError("Cannot clean up database until connection is open")
# Do nothing for REST
Logging.debug("[DB] REST Database connection closed")
self.isOpen = False
def _doSql(self, sql):
self._lastSql = sql # remember this, last SQL attempted
try:
r = requests.post(self._url,
data = sql,
auth = HTTPBasicAuth('root', 'taosdata'))
except:
print("REST API Failure (TODO: more info here)")
raise
rj = r.json()
# Sanity check for the "Json Result"
if ('status' not in rj):
raise RuntimeError("No status in REST response")
if rj['status'] == 'error': # clearly reported error
if ('code' not in rj): # error without code
raise RuntimeError("REST error return without code")
errno = rj['code'] # May need to massage this in the future
# print("Raising programming error with REST return: {}".format(rj))
raise taos.error.ProgrammingError(
rj['desc'], errno) # todo: check existance of 'desc'
if rj['status'] != 'succ': # better be this
raise RuntimeError(
"Unexpected REST return status: {}".format(
rj['status']))
nRows = rj['rows'] if ('rows' in rj) else 0
self._result = rj
return nRows
def execute(self, sql):
if (not self.isOpen):
raise RuntimeError(
"Cannot execute database commands until connection is open")
Logging.debug("[SQL-REST] Executing SQL: {}".format(sql))
nRows = self._doSql(sql)
Logging.debug(
"[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
def query(self, sql): # return rows affected
return self.execute(sql)
def getQueryResult(self):
return self._result['data']
def getResultRows(self):
print(self._result)
raise RuntimeError("TBD") # TODO: finish here to support -v under -c rest
# return self._tdSql.queryRows
def getResultCols(self):
print(self._result)
raise RuntimeError("TBD")
# Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
class MyTDSql:
# Class variables
_clsLock = threading.Lock() # class wide locking
longestQuery = None # type: str
longestQueryTime = 0.0 # seconds
lqStartTime = 0.0
# lqEndTime = 0.0 # Not needed, as we have the two above already
def __init__(self, hostAddr, cfgPath):
# Make the DB connection
self._conn = taos.connect(host=hostAddr, config=cfgPath)
self._cursor = self._conn.cursor()
self.queryRows = 0
self.queryCols = 0
self.affectedRows = 0
# def init(self, cursor, log=True):
# self.cursor = cursor
# if (log):
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# self.cursor.log(caller.filename + ".sql")
def close(self):
self._cursor.close() # can we double close?
self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
self._cursor.close()
def _execInternal(self, sql):
startTime = time.time()
ret = self._cursor.execute(sql)
# print("\nSQL success: {}".format(sql))
queryTime = time.time() - startTime
# Record the query time
cls = self.__class__
if queryTime > (cls.longestQueryTime + 0.01) :
with cls._clsLock:
cls.longestQuery = sql
cls.longestQueryTime = queryTime
cls.lqStartTime = startTime
return ret
def query(self, sql):
self.sql = sql
try:
self._execInternal(sql)
self.queryResult = self._cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self._cursor.description)
except Exception as e:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
# tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
raise
return self.queryRows
def execute(self, sql):
self.sql = sql
try:
self.affectedRows = self._execInternal(sql)
except Exception as e:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
# tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
raise
return self.affectedRows
class DbTarget:
def __init__(self, cfgPath, hostAddr, port):
self.cfgPath = cfgPath
self.hostAddr = hostAddr
self.port = port
def __repr__(self):
return "[DbTarget: cfgPath={}, host={}:{}]".format(
Helper.getFriendlyPath(self.cfgPath), self.hostAddr, self.port)
def getEp(self):
return "{}:{}".format(self.hostAddr, self.port)
class DbConnNative(DbConn):
# Class variables
_lock = threading.Lock()
# _connInfoDisplayed = False # TODO: find another way to display this
totalConnections = 0 # Not private
def __init__(self, dbTarget):
super().__init__(dbTarget)
self._type = self.TYPE_NATIVE
self._conn = None
# self._cursor = None
def openByType(self): # Open connection
# global gContainer
# tInst = tInst or gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance
# cfgPath = self.getBuildPath() + "/test/cfg"
# cfgPath = tInst.getCfgDir()
# hostAddr = tInst.getHostAddr()
cls = self.__class__ # Get the class, to access class variables
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
dbTarget = self._dbTarget
# if not cls._connInfoDisplayed:
# cls._connInfoDisplayed = True # updating CLASS variable
Logging.debug("Initiating TAOS native connection to {}".format(dbTarget))
# Make the connection
# self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
# self._cursor = self._conn.cursor()
# Record the count in the class
self._tdSql = MyTDSql(dbTarget.hostAddr, dbTarget.cfgPath) # making DB connection
cls.totalConnections += 1
self._tdSql.execute('reset query cache')
# self._cursor.execute('use db') # do this at the beginning of every
# Open connection
# self._tdSql = MyTDSql()
# self._tdSql.init(self._cursor)
def close(self):
if (not self.isOpen):
raise RuntimeError("Cannot clean up database until connection is open")
self._tdSql.close()
# Decrement the class wide counter
cls = self.__class__ # Get the class, to access class variables
with cls._lock:
cls.totalConnections -= 1
Logging.debug("[DB] Database connection closed")
self.isOpen = False
def execute(self, sql):
if (not self.isOpen):
raise RuntimeError("Cannot execute database commands until connection is open")
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.execute(sql)
Logging.debug(
"[SQL] Execution Result, nRows = {}, SQL = {}".format(
nRows, sql))
return nRows
def query(self, sql): # return rows affected
if (not self.isOpen):
raise RuntimeError(
"Cannot query database until connection is open")
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.query(sql)
Logging.debug(
"[SQL] Query Result, nRows = {}, SQL = {}".format(
nRows, sql))
return nRows
# results are in: return self._tdSql.queryResult
def getQueryResult(self):
return self._tdSql.queryResult
def getResultRows(self):
return self._tdSql.queryRows
def getResultCols(self):
return self._tdSql.queryCols
class DbManager():
''' This is a wrapper around DbConn(), to make it easier to use.
TODO: rename this to DbConnManager
'''
def __init__(self, cType, dbTarget):
# self.tableNumQueue = LinearQueue() # TODO: delete?
# self.openDbServerConnection()
self._dbConn = DbConn.createNative(dbTarget) if (
cType == 'native') else DbConn.createRest(dbTarget)
try:
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
except taos.error.ProgrammingError as err:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if (err.msg == 'client disconnected'): # cannot open DB connection
print(
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
sys.exit(2)
else:
print("Failed to connect to DB, errno = {}, msg: {}"
.format(Helper.convertErrno(err.errno), err.msg))
raise
except BaseException:
print("[=] Unexpected exception")
raise
# Do this after dbConn is in proper shape
# Moved to Database()
# self._stateMachine = StateMechine(self._dbConn)
def getDbConn(self):
return self._dbConn
# TODO: not used any more, to delete
def pickAndAllocateTable(self): # pick any table, and "use" it
return self.tableNumQueue.pickAndAllocate()
# TODO: Not used any more, to delete
def addTable(self):
with self._lock:
tIndex = self.tableNumQueue.push()
return tIndex
# Not used any more, to delete
def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i)
# TODO: not used any more, delete
def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition!
if (not tblNum): # maybe false
return False
return "table_{}".format(tblNum)
def cleanUp(self):
self._dbConn.close()

View File

@ -0,0 +1,175 @@
import threading
import random
import logging
import os
class CrashGenError(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self.errno = errno
def __str__(self):
return self.msg
class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord):
if (record.levelno >= logging.INFO):
return True # info or above always log
# Commenting out below to adjust...
# if msg.startswith("[TRD]"):
# return False
return True
class MyLoggingAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return "[{}] {}".format(threading.get_ident() % 10000, msg), kwargs
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
class Logging:
logger = None
@classmethod
def getLogger(cls):
return logger
@classmethod
def clsInit(cls, gConfig): # TODO: refactor away gConfig
if cls.logger:
return
# Logging Stuff
# global misc.logger
_logger = logging.getLogger('CrashGen') # real logger
_logger.addFilter(LoggingFilter())
ch = logging.StreamHandler()
_logger.addHandler(ch)
# Logging adapter, to be used as a logger
print("setting logger variable")
# global logger
cls.logger = MyLoggingAdapter(_logger, [])
if (gConfig.debug):
cls.logger.setLevel(logging.DEBUG) # default seems to be INFO
else:
cls.logger.setLevel(logging.INFO)
@classmethod
def info(cls, msg):
cls.logger.info(msg)
@classmethod
def debug(cls, msg):
cls.logger.debug(msg)
@classmethod
def warning(cls, msg):
cls.logger.warning(msg)
@classmethod
def error(cls, msg):
cls.logger.error(msg)
class Status:
STATUS_STARTING = 1
STATUS_RUNNING = 2
STATUS_STOPPING = 3
STATUS_STOPPED = 4
def __init__(self, status):
self.set(status)
def __repr__(self):
return "[Status: v={}]".format(self._status)
def set(self, status):
self._status = status
def get(self):
return self._status
def isStarting(self):
return self._status == Status.STATUS_STARTING
def isRunning(self):
# return self._thread and self._thread.is_alive()
return self._status == Status.STATUS_RUNNING
def isStopping(self):
return self._status == Status.STATUS_STOPPING
def isStopped(self):
return self._status == Status.STATUS_STOPPED
def isStable(self):
return self.isRunning() or self.isStopped()
# Deterministic random number generator
class Dice():
seeded = False # static, uninitialized
@classmethod
def seed(cls, s): # static
if (cls.seeded):
raise RuntimeError(
"Cannot seed the random generator more than once")
cls.verifyRNG()
random.seed(s)
cls.seeded = True # TODO: protect against multi-threading
@classmethod
def verifyRNG(cls): # Verify that the RNG is determinstic
random.seed(0)
x1 = random.randrange(0, 1000)
x2 = random.randrange(0, 1000)
x3 = random.randrange(0, 1000)
if (x1 != 864 or x2 != 394 or x3 != 776):
raise RuntimeError("System RNG is not deterministic")
@classmethod
def throw(cls, stop): # get 0 to stop-1
return cls.throwRange(0, stop)
@classmethod
def throwRange(cls, start, stop): # up to stop-1
if (not cls.seeded):
raise RuntimeError("Cannot throw dice before seeding it")
return random.randrange(start, stop)
@classmethod
def choice(cls, cList):
return random.choice(cList)
class Helper:
@classmethod
def convertErrno(cls, errno):
return errno if (errno > 0) else 0x80000000 + errno
@classmethod
def getFriendlyPath(cls, path): # returns .../xxx/yyy
ht1 = os.path.split(path)
ht2 = os.path.split(ht1[0])
return ".../" + ht2[1] + '/' + ht1[1]
class Progress:
STEP_BOUNDARY = 0
BEGIN_THREAD_STEP = 1
END_THREAD_STEP = 2
SERVICE_HEART_BEAT= 3
tokens = {
STEP_BOUNDARY: '.',
BEGIN_THREAD_STEP: '[',
END_THREAD_STEP: '] ',
SERVICE_HEART_BEAT: '.Y.'
}
@classmethod
def emit(cls, token):
print(cls.tokens[token], end="", flush=True)

View File

@ -0,0 +1,729 @@
import os
import io
import sys
import threading
import signal
import logging
import time
import subprocess
from typing import IO, List
try:
import psutil
except:
print("Psutil module needed, please install: sudo pip3 install psutil")
sys.exit(-1)
from queue import Queue, Empty
from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from .db import DbConn, DbTarget
class TdeInstance():
"""
A class to capture the *static* information of a TDengine instance,
including the location of the various files/directories, and basica
configuration.
"""
@classmethod
def _getBuildPath(cls):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("communit")]
else:
projPath = selfPath[:selfPath.find("tests")]
buildPath = None
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
if buildPath == None:
raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
.format(selfPath, projPath))
return buildPath
def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030):
self._buildDir = self._getBuildPath()
self._subdir = '/' + subdir # TODO: tolerate "/"
self._port = port # TODO: support different IP address too
self._fepPort = fepPort
self._tInstNum = tInstNum
self._smThread = ServiceManagerThread()
def getDbTarget(self):
return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port)
def getPort(self):
return self._port
def __repr__(self):
return "[TdeInstance: {}, subdir={}]".format(
self._buildDir, Helper.getFriendlyPath(self._subdir))
def generateCfgFile(self):
# print("Logger = {}".format(logger))
# buildPath = self.getBuildPath()
# taosdPath = self._buildPath + "/build/bin/taosd"
cfgDir = self.getCfgDir()
cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed
if os.path.exists(cfgFile):
if os.path.isfile(cfgFile):
Logging.warning("Config file exists already, skip creation: {}".format(cfgFile))
return # cfg file already exists, nothing to do
else:
raise CrashGenError("Invalid config file: {}".format(cfgFile))
# Now that the cfg file doesn't exist
if os.path.exists(cfgDir):
if not os.path.isdir(cfgDir):
raise CrashGenError("Invalid config dir: {}".format(cfgDir))
# else: good path
else:
os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p"
# Now we have a good cfg dir
cfgValues = {
'runDir': self.getRunDir(),
'ip': '127.0.0.1', # TODO: change to a network addressable ip
'port': self._port,
'fepPort': self._fepPort,
}
cfgTemplate = """
dataDir {runDir}/data
logDir {runDir}/log
charset UTF-8
firstEp {ip}:{fepPort}
fqdn {ip}
serverPort {port}
# was all 135 below
dDebugFlag 135
cDebugFlag 135
rpcDebugFlag 135
qDebugFlag 135
# httpDebugFlag 143
# asyncLog 0
# tables 10
maxtablesPerVnode 10
rpcMaxTime 101
# cache 2
keep 36500
# walLevel 2
walLevel 1
#
# maxConnections 100
"""
cfgContent = cfgTemplate.format_map(cfgValues)
f = open(cfgFile, "w")
f.write(cfgContent)
f.close()
def rotateLogs(self):
logPath = self.getLogDir()
# ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
if os.path.exists(logPath):
logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
Logging.info("Saving old log files to: {}".format(logPathSaved))
os.rename(logPath, logPathSaved)
# os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
def getExecFile(self): # .../taosd
return self._buildDir + "/build/bin/taosd"
def getRunDir(self): # TODO: rename to "root dir" ?!
return self._buildDir + self._subdir
def getCfgDir(self): # path, not file
return self.getRunDir() + "/cfg"
def getLogDir(self):
return self.getRunDir() + "/log"
def getHostAddr(self):
return "127.0.0.1"
def getServiceCmdLine(self): # to start the instance
return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
def _getDnodes(self, dbc):
dbc.query("show dnodes")
cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason
return {c[1]:c[4] for c in cols} # {'xxx:6030':'ready', 'xxx:6130':'ready'}
def createDnode(self, dbt: DbTarget):
"""
With a connection to the "first" EP, let's create a dnode for someone else who
wants to join.
"""
dbc = DbConn.createNative(self.getDbTarget())
dbc.open()
if dbt.getEp() in self._getDnodes(dbc):
Logging.info("Skipping DNode creation for: {}".format(dbt))
dbc.close()
return
sql = "CREATE DNODE \"{}\"".format(dbt.getEp())
dbc.execute(sql)
dbc.close()
def getStatus(self):
return self._smThread.getStatus()
def getSmThread(self):
return self._smThread
def start(self):
if not self.getStatus().isStopped():
raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus()))
Logging.info("Starting TDengine instance: {}".format(self))
self.generateCfgFile() # service side generates config file, client does not
self.rotateLogs()
self._smThread.start(self.getServiceCmdLine())
def stop(self):
self._smThread.stop()
def isFirst(self):
return self._tInstNum == 0
class TdeSubProcess:
"""
A class to to represent the actual sub process that is the run-time
of a TDengine instance.
It takes a TdeInstance object as its parameter, with the rationale being
"a sub process runs an instance".
"""
# RET_ALREADY_STOPPED = -1
# RET_TIME_OUT = -3
# RET_SUCCESS = -4
def __init__(self):
self.subProcess = None
# if tInst is None:
# raise CrashGenError("Empty instance not allowed in TdeSubProcess")
# self._tInst = tInst # Default create at ServiceManagerThread
def getStdOut(self):
return self.subProcess.stdout
def getStdErr(self):
return self.subProcess.stderr
def isRunning(self):
return self.subProcess is not None
def getPid(self):
return self.subProcess.pid
def start(self, cmdLine):
ON_POSIX = 'posix' in sys.builtin_module_names
# Sanity check
if self.subProcess: # already there
raise RuntimeError("Corrupt process state")
self.subProcess = subprocess.Popen(
cmdLine,
shell=False,
# svcCmdSingle, shell=True, # capture core dump?
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# bufsize=1, # not supported in binary mode
close_fds=ON_POSIX
) # had text=True, which interferred with reading EOF
def stop(self):
"""
Stop a sub process, and try to return a meaningful return code.
Common POSIX signal values (from man -7 signal):
SIGHUP 1
SIGINT 2
SIGQUIT 3
SIGILL 4
SIGTRAP 5
SIGABRT 6
SIGIOT 6
SIGBUS 7
SIGEMT -
SIGFPE 8
SIGKILL 9
SIGUSR1 10
SIGSEGV 11
SIGUSR2 12
"""
if not self.subProcess:
print("Sub process already stopped")
return # -1
retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
if retCode: # valid return code, process ended
retCode = -retCode # only if valid
Logging.warning("TSP.stop(): process ended itself")
self.subProcess = None
return retCode
# process still alive, let's interrupt it
print("Terminate running process, send SIG_INT and wait...")
# sub process should end, then IPC queue should end, causing IO thread to end
self.subProcess.send_signal(signal.SIGINT)
self.subProcess.wait(20)
retCode = self.subProcess.returncode # should always be there
# May throw subprocess.TimeoutExpired exception above, therefore
# The process is guranteed to have ended by now
self.subProcess = None
if retCode != 0: # != (- signal.SIGINT):
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG_INT, retCode={}".format(retCode))
else:
Logging.info("TSP.stop(): sub proc successfully terminated with SIG_INT")
return - retCode
class ServiceManager:
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
def __init__(self, numDnodes): # >1 when we run a cluster
Logging.info("TDengine Service Manager (TSM) created")
self._numDnodes = numDnodes # >1 means we have a cluster
self._lock = threading.Lock()
# signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
# signal.signal(signal.SIGINT, self.sigIntHandler)
# signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
self.inSigHandler = False
# self._status = MainExec.STATUS_RUNNING # set inside
# _startTaosService()
self._runCluster = (numDnodes > 1)
self._tInsts : List[TdeInstance] = []
for i in range(0, numDnodes):
ti = self._createTdeInstance(i) # construct tInst
self._tInsts.append(ti)
# self.svcMgrThreads : List[ServiceManagerThread] = []
# for i in range(0, numDnodes):
# thread = self._createThread(i) # construct tInst
# self.svcMgrThreads.append(thread)
def _createTdeInstance(self, dnIndex):
if not self._runCluster: # single instance
subdir = 'test'
else: # Create all threads in a cluster
subdir = 'cluster_dnode_{}'.format(dnIndex)
fepPort= 6030 # firstEP Port
port = fepPort + dnIndex * 100
return TdeInstance(subdir, dnIndex, port, fepPort)
# return ServiceManagerThread(dnIndex, ti)
def _doMenu(self):
choice = ""
while True:
print("\nInterrupting Service Program, Choose an Action: ")
print("1: Resume")
print("2: Terminate")
print("3: Restart")
# Remember to update the if range below
# print("Enter Choice: ", end="", flush=True)
while choice == "":
choice = input("Enter Choice: ")
if choice != "":
break # done with reading repeated input
if choice in ["1", "2", "3"]:
break # we are done with whole method
print("Invalid choice, please try again.")
choice = "" # reset
return choice
def sigUsrHandler(self, signalNumber, frame):
print("Interrupting main thread execution upon SIGUSR1")
if self.inSigHandler: # already
print("Ignoring repeated SIG...")
return # do nothing if it's already not running
self.inSigHandler = True
choice = self._doMenu()
if choice == "1":
self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
elif choice == "2":
self.stopTaosServices()
elif choice == "3": # Restart
self.restart()
else:
raise RuntimeError("Invalid menu choice: {}".format(choice))
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame):
print("ServiceManager: INT Signal Handler starting...")
if self.inSigHandler:
print("Ignoring repeated SIG_INT...")
return
self.inSigHandler = True
self.stopTaosServices()
print("ServiceManager: INT Signal Handler returning...")
self.inSigHandler = False
def sigHandlerResume(self):
print("Resuming TDengine service manager (main thread)...\n\n")
# def _updateThreadStatus(self):
# if self.svcMgrThread: # valid svc mgr thread
# if self.svcMgrThread.isStopped(): # done?
# self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
# self.svcMgrThread = None # no more
def isActive(self):
"""
Determine if the service/cluster is active at all, i.e. at least
one thread is not "stopped".
"""
for ti in self._tInsts:
if not ti.getStatus().isStopped():
return True
return False
# def isRestarting(self):
# """
# Determine if the service/cluster is being "restarted", i.e., at least
# one thread is in "restarting" status
# """
# for thread in self.svcMgrThreads:
# if thread.isRestarting():
# return True
# return False
def isStable(self):
"""
Determine if the service/cluster is "stable", i.e. all of the
threads are in "stable" status.
"""
for ti in self._tInsts:
if not ti.getStatus().isStable():
return False
return True
def _procIpcAll(self):
while self.isActive():
Progress.emit(Progress.SERVICE_HEART_BEAT)
for ti in self._tInsts: # all thread objects should always be valid
# while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
status = ti.getStatus()
if status.isRunning():
th = ti.getSmThread()
th.procIpcBatch() # regular processing,
if status.isStopped():
th.procIpcBatch() # one last time?
# self._updateThreadStatus()
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
# raise CrashGenError("dummy")
print("Service Manager Thread (with subprocess) ended, main thread exiting...")
def _getFirstInstance(self):
return self._tInsts[0]
def startTaosServices(self):
with self._lock:
if self.isActive():
raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running")
# Find if there's already a taosd service, and then kill it
for proc in psutil.process_iter():
if proc.name() == 'taosd':
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt")
time.sleep(2.0)
proc.kill()
# print("Process: {}".format(proc.name()))
# self.svcMgrThread = ServiceManagerThread() # create the object
for ti in self._tInsts:
ti.start()
if not ti.isFirst():
tFirst = self._getFirstInstance()
tFirst.createDnode(ti.getDbTarget())
ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
def stopTaosServices(self):
with self._lock:
if not self.isActive():
Logging.warning("Cannot stop TAOS service(s), already not active")
return
for ti in self._tInsts:
ti.stop()
def run(self):
self.startTaosServices()
self._procIpcAll() # pump/process all the messages, may encounter SIG + restart
if self.isActive(): # if sig handler hasn't destroyed it by now
self.stopTaosServices() # should have started already
def restart(self):
if not self.isStable():
Logging.warning("Cannot restart service/cluster, when not stable")
return
# self._isRestarting = True
if self.isActive():
self.stopTaosServices()
else:
Logging.warning("Service not active when restart requested")
self.startTaosServices()
# self._isRestarting = False
# def isRunning(self):
# return self.svcMgrThread != None
# def isRestarting(self):
# return self._isRestarting
class ServiceManagerThread:
"""
A class representing a dedicated thread which manages the "sub process"
of the TDengine service, interacting with its STDOUT/ERR.
It takes a TdeInstance parameter at creation time, or create a default
"""
MAX_QUEUE_SIZE = 10000
def __init__(self):
# Set the sub process
self._tdeSubProcess = None # type: TdeSubProcess
# Arrange the TDengine instance
# self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
# self._tInst = tInst or TdeInstance() # Need an instance
self._thread = None # The actual thread, # type: threading.Thread
self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
def __repr__(self):
return "[SvcMgrThread: status={}, subProc={}]".format(
self.getStatus(), self._tdeSubProcess)
def getStatus(self):
return self._status
# Start the thread (with sub process), and wait for the sub service
# to become fully operational
def start(self, cmdLine):
if self._thread:
raise RuntimeError("Unexpected _thread")
if self._tdeSubProcess:
raise RuntimeError("TDengine sub process already created/running")
Logging.info("Attempting to start TAOS service: {}".format(self))
self._status.set(Status.STATUS_STARTING)
self._tdeSubProcess = TdeSubProcess()
self._tdeSubProcess.start(cmdLine)
self._ipcQueue = Queue()
self._thread = threading.Thread( # First thread captures server OUTPUT
target=self.svcOutputReader,
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
self._thread.daemon = True # thread dies with the program
self._thread.start()
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
target=self.svcErrorReader,
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
self._thread2.daemon = True # thread dies with the program
self._thread2.start()
# wait for service to start
for i in range(0, 100):
time.sleep(1.0)
# self.procIpcBatch() # don't pump message during start up
print("_zz_", end="", flush=True)
if self._status.isRunning():
Logging.info("[] TDengine service READY to process requests")
Logging.info("[] TAOS service started: {}".format(self))
# self._verifyDnode(self._tInst) # query and ensure dnode is ready
# Logging.debug("[] TAOS Dnode verified: {}".format(self))
return # now we've started
# TODO: handle failure-to-start better?
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
raise RuntimeError("TDengine service did not start successfully: {}".format(self))
def _verifyDnode(self, tInst: TdeInstance):
dbc = DbConn.createNative(tInst.getDbTarget())
dbc.open()
dbc.query("show dnodes")
# dbc.query("DESCRIBE {}.{}".format(dbName, self._stName))
cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason
# ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
isValid = False
for col in cols:
# print("col = {}".format(col))
ep = col[1].split(':') # 10.1.30.2:6030
print("Found ep={}".format(ep))
if tInst.getPort() == int(ep[1]): # That's us
# print("Valid Dnode matched!")
isValid = True # now we are valid
break
if not isValid:
print("Failed to start dnode, sleep for a while")
time.sleep(600)
raise RuntimeError("Failed to start Dnode, expected port not found: {}".
format(tInst.getPort()))
dbc.close()
def stop(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
if self.getStatus().isStopped():
print("Service already stopped")
return
if self.getStatus().isStopping():
print("Service is already being stopped")
return
# Linux will send Control-C generated SIGINT to the TDengine process
# already, ref:
# https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self._tdeSubProcess:
raise RuntimeError("sub process object missing")
self._status.set(Status.STATUS_STOPPING)
# retCode = self._tdeSubProcess.stop()
try:
retCode = self._tdeSubProcess.stop()
# print("Attempted to stop sub process, got return code: {}".format(retCode))
if retCode == signal.SIGSEGV : # SGV
Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
except subprocess.TimeoutExpired as err:
print("Time out waiting for TDengine service process to exit")
else:
if self._tdeSubProcess.isRunning(): # still running, should now never happen
print("FAILED to stop sub process, it is still running... pid = {}".format(
self._tdeSubProcess.getPid()))
else:
self._tdeSubProcess = None # not running any more
self.join() # stop the thread, change the status, etc.
# Check if it's really stopped
outputLines = 10 # for last output
if self.getStatus().isStopped():
self.procIpcBatch(outputLines) # one last time
Logging.debug("End of TDengine Service Output: {}".format(self))
Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n")
else:
print("WARNING: SMT did not terminate as expected: {}".format(self))
def join(self):
# TODO: sanity check
if not self.getStatus().isStopping():
raise RuntimeError(
"SMT.Join(): Unexpected status: {}".format(self._status))
if self._thread:
self._thread.join()
self._thread = None
self._status.set(Status.STATUS_STOPPED)
# STD ERR thread
self._thread2.join()
self._thread2 = None
else:
print("Joining empty thread, doing nothing")
def _trimQueue(self, targetSize):
if targetSize <= 0:
return # do nothing
q = self._ipcQueue
if (q.qsize() <= targetSize): # no need to trim
return
Logging.debug("Triming IPC queue to target size: {}".format(targetSize))
itemsToTrim = q.qsize() - targetSize
for i in range(0, itemsToTrim):
try:
q.get_nowait()
except Empty:
break # break out of for loop, no more trimming
TD_READY_MSG = "TDengine is initialized successfully"
def procIpcBatch(self, trimToTarget=0, forceOutput=False):
self._trimQueue(trimToTarget) # trim if necessary
# Process all the output generated by the underlying sub process,
# managed by IO thread
print("<", end="", flush=True)
while True:
try:
line = self._ipcQueue.get_nowait() # getting output at fast speed
self._printProgress("_o")
except Empty:
# time.sleep(2.3) # wait only if there's no output
# no more output
print(".>", end="", flush=True)
return # we are done with THIS BATCH
else: # got line, printing out
if forceOutput:
Logging.info(line)
else:
Logging.debug(line)
print(">", end="", flush=True)
_ProgressBars = ["--", "//", "||", "\\\\"]
def _printProgress(self, msg): # TODO: assuming 2 chars
print(msg, end="", flush=True)
pBar = self._ProgressBars[Dice.throw(4)]
print(pBar, end="", flush=True)
print('\b\b\b\b', end="", flush=True)
def svcOutputReader(self, out: IO, queue):
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...")
# for line in out :
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
try:
line = line.decode("utf-8").rstrip()
except UnicodeError:
print("\nNon-UTF8 server output: {}\n".format(line))
# This might block, and then causing "out" buffer to block
queue.put(line)
self._printProgress("_i")
if self._status.isStarting(): # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1: # found
Logging.info("Waiting for the service to become FULLY READY")
time.sleep(1.0) # wait for the server to truly start. TODO: remove this
Logging.info("Service is now FULLY READY") # TODO: more ID info here?
self._status.set(Status.STATUS_RUNNING)
# Trim the queue if necessary: TODO: try this 1 out of 10 times
self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
if self._status.isStopping(): # TODO: use thread status instead
# WAITING for stopping sub process to finish its outptu
print("_w", end="", flush=True)
# queue.put(line)
# meaning sub process must have died
Logging.info("\nEnd of stream detected for TDengine STDOUT: {}".format(self))
out.close()
def svcErrorReader(self, err: IO, queue):
for line in iter(err.readline, b''):
print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
Logging.info("\nEnd of stream detected for TDengine STDERR: {}".format(self))
err.close()

View File

@ -0,0 +1,23 @@
# -----!/usr/bin/python3.7
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
import sys
from crash_gen.crash_gen import MainExec
if __name__ == "__main__":
mExec = MainExec()
mExec.init()
exitCode = mExec.run()
print("Exiting with code: {}".format(exitCode))
sys.exit(exitCode)

View File

@ -18,7 +18,9 @@ python3 ./test.py -f insert/multi.py
python3 ./test.py -f insert/randomNullCommit.py
python3 insert/retentionpolicy.py
python3 ./test.py -f insert/alterTableAndInsert.py
python3 ./test.py -f insert/insertIntoTwoTables.py
python3 ./test.py -f table/alter_wal0.py
python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py
python3 ./test.py -f table/db_table.py

View File

@ -0,0 +1,53 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
### test case for TD-1758 ###
print("==============step1")
tdSql.execute(
"create table t0(ts timestamp, c int)")
tdSql.execute(
'create table t1(ts timestamp, c binary(1))')
tdSql.execute(
"insert into t0 values(now,1) t1 values(now,'0')(now+1a,'1')(now+2a,'2')(now+3a,'3')(now+4a,'4')")
print("==============step2")
tdSql.query("select * from t0")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 1)
tdSql.query("select * from t1")
tdSql.checkRows(5)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -96,6 +96,12 @@ class TDTestCase:
tdSql.query("select * from st order by ts desc")
self.checkColumnSorted(0, "desc")
print("======= step 2: verify order for special column =========")
tdSql.query("select tbcol1 from st order by ts desc")
tdSql.query("select tbcol6 from st order by ts desc")
for i in range(1, 10):
tdSql.error("select * from st order by tbcol%d" % i)
tdSql.error("select * from st order by tbcol%d asc" % i)

View File

@ -103,7 +103,7 @@ class TDTestCase:
tdSql.execute('alter table stb add tag tnc nchar(10)')
for tid in range(1, self.ntables + 1):
tdSql.execute('alter table tb%d set tag tnc=\"%s\"' %
(tid, str(tid * 1.2)))
(tid, str(tid + 1000000000)))
tdLog.info("insert %d data in to each %d tables" % (2, self.ntables))
for rid in range(self.rowsPerTable + 1, self.rowsPerTable + 3):
sqlcmd = ['insert into']

View File

@ -52,10 +52,10 @@ sleep 1000
sql use $db
sql drop table tb5
$i = 0
while $i < 4
$tbId = $i + $halfNum
$tb = $tbPrefix . $i
$tb = tb . $i
$x = 0
while $x < $rowNum
$xs = $x * $delta