Merge branch 'develop' into feature/query
This commit is contained in:
commit
df051e640d
|
@ -29,6 +29,31 @@ pipeline {
|
||||||
|
|
||||||
stage('Parallel test stage') {
|
stage('Parallel test stage') {
|
||||||
parallel {
|
parallel {
|
||||||
|
stage('pytest') {
|
||||||
|
agent{label 'master'}
|
||||||
|
steps {
|
||||||
|
sh '''
|
||||||
|
date
|
||||||
|
cd ${WKC}
|
||||||
|
git checkout develop
|
||||||
|
git pull
|
||||||
|
git submodule update
|
||||||
|
cd ${WK}
|
||||||
|
git checkout develop
|
||||||
|
git pull
|
||||||
|
export TZ=Asia/Harbin
|
||||||
|
date
|
||||||
|
rm -rf ${WK}/debug
|
||||||
|
mkdir debug
|
||||||
|
cd debug
|
||||||
|
cmake .. > /dev/null
|
||||||
|
make > /dev/null
|
||||||
|
cd ${WKC}/tests
|
||||||
|
#./test-all.sh smoke
|
||||||
|
./test-all.sh pytest
|
||||||
|
date'''
|
||||||
|
}
|
||||||
|
}
|
||||||
stage('test_b1') {
|
stage('test_b1') {
|
||||||
agent{label '184'}
|
agent{label '184'}
|
||||||
steps {
|
steps {
|
||||||
|
@ -105,30 +130,8 @@ pipeline {
|
||||||
make > /dev/null
|
make > /dev/null
|
||||||
cd ${WKC}/tests/pytest
|
cd ${WKC}/tests/pytest
|
||||||
./valgrind-test.sh 2>&1 > mem-error-out.log
|
./valgrind-test.sh 2>&1 > mem-error-out.log
|
||||||
grep \'start to execute\\|ERROR SUMMARY\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-mem-error-out.log
|
./handle_val_log.sh
|
||||||
|
|
||||||
for memError in `grep \'ERROR SUMMARY\' uniq-mem-error-out.log | awk \'{print $4}\'`
|
|
||||||
do
|
|
||||||
if [ -n "$memError" ]; then
|
|
||||||
if [ "$memError" -gt 12 ]; then
|
|
||||||
echo -e "${RED} ## Memory errors number valgrind reports is $memError.\\
|
|
||||||
More than our threshold! ## ${NC}"
|
|
||||||
travis_terminate $memError
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
|
|
||||||
grep \'start to execute\\|definitely lost:\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-definitely-lost-out.log
|
|
||||||
for defiMemError in `grep \'definitely lost:\' uniq-definitely-lost-out.log | awk \'{print $7}\'`
|
|
||||||
do
|
|
||||||
if [ -n "$defiMemError" ]; then
|
|
||||||
if [ "$defiMemError" -gt 13 ]; then
|
|
||||||
echo -e "${RED} ## Memory errors number valgrind reports \\
|
|
||||||
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
|
|
||||||
travis_terminate $defiMemError
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
date
|
date
|
||||||
cd ${WKC}/tests
|
cd ${WKC}/tests
|
||||||
./test-all.sh b3
|
./test-all.sh b3
|
||||||
|
@ -140,4 +143,5 @@ pipeline {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
}
|
|
@ -16,6 +16,7 @@ ELSEIF (TD_WINDOWS)
|
||||||
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/tests/examples DESTINATION .)
|
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/tests/examples DESTINATION .)
|
||||||
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/packaging/cfg DESTINATION .)
|
INSTALL(DIRECTORY ${TD_COMMUNITY_DIR}/packaging/cfg DESTINATION .)
|
||||||
INSTALL(FILES ${TD_COMMUNITY_DIR}/src/inc/taos.h DESTINATION include)
|
INSTALL(FILES ${TD_COMMUNITY_DIR}/src/inc/taos.h DESTINATION include)
|
||||||
|
INSTALL(FILES ${TD_COMMUNITY_DIR}/src/inc/taoserror.h DESTINATION include)
|
||||||
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver)
|
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver)
|
||||||
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.exp DESTINATION driver)
|
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.exp DESTINATION driver)
|
||||||
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver)
|
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver)
|
||||||
|
|
|
@ -72,38 +72,34 @@ maven 项目中使用如下 pom.xml 配置即可:
|
||||||
|
|
||||||
### 获取连接
|
### 获取连接
|
||||||
|
|
||||||
如下所示配置即可获取 TDengine Connection:
|
#### 通过JdbcUrl获取连接
|
||||||
|
|
||||||
|
通过指定的jdbcUrl获取连接,如下所示:
|
||||||
```java
|
```java
|
||||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||||
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/log?user=root&password=taosdata";
|
String jdbcUrl = "jdbc:TAOS://taosdemo.com:6030/test?user=root&password=taosdata";
|
||||||
Connection conn = DriverManager.getConnection(jdbcUrl);
|
Connection conn = DriverManager.getConnection(jdbcUrl);
|
||||||
```
|
```
|
||||||
> 端口 6030 为默认连接端口,JDBC URL 中的 log 为系统本身的监控数据库。
|
以上示例,建立了到hostname为taosdemo.com,端口为6030(TDengine的默认端口),数据库名为test的连接。这个url中指定用户名(user)为root,密码(password)为taosdata。
|
||||||
|
|
||||||
TDengine 的 JDBC URL 规范格式为:
|
TDengine 的 JDBC URL 规范格式为:
|
||||||
`jdbc:TAOS://{host_ip}:{port}/[database_name]?[user={user}|&password={password}|&charset={charset}|&cfgdir={config_dir}|&locale={locale}|&timezone={timezone}]`
|
`jdbc:TAOS://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&charset={charset}|&cfgdir={config_dir}|&locale={locale}|&timezone={timezone}]`
|
||||||
|
url中的配置参数如下:
|
||||||
其中,`{}` 中的内容必须,`[]` 中为可选。配置参数说明如下:
|
|
||||||
|
|
||||||
* user:登录 TDengine 用户名,默认值 root。
|
* user:登录 TDengine 用户名,默认值 root。
|
||||||
* password:用户登录密码,默认值 taosdata。
|
* password:用户登录密码,默认值 taosdata。
|
||||||
* charset:客户端使用的字符集,默认值为系统字符集。
|
|
||||||
* cfgdir:客户端配置文件目录路径,Linux OS 上默认值 /etc/taos ,Windows OS 上默认值 C:/TDengine/cfg。
|
* cfgdir:客户端配置文件目录路径,Linux OS 上默认值 /etc/taos ,Windows OS 上默认值 C:/TDengine/cfg。
|
||||||
|
* charset:客户端使用的字符集,默认值为系统字符集。
|
||||||
* locale:客户端语言环境,默认值系统当前 locale。
|
* locale:客户端语言环境,默认值系统当前 locale。
|
||||||
* timezone:客户端使用的时区,默认值为系统当前时区。
|
* timezone:客户端使用的时区,默认值为系统当前时区。
|
||||||
|
|
||||||
以上参数可以在 3 处配置,`优先级由高到低`分别如下:
|
#### 使用JdbcUrl和Properties获取连接
|
||||||
1. JDBC URL 参数
|
|
||||||
如上所述,可以在 JDBC URL 的参数中指定。
|
除了通过指定的jdbcUrl获取连接,还可以使用Properties指定建立连接时的参数,如下所示:
|
||||||
2. java.sql.DriverManager.getConnection(String jdbcUrl, Properties connProps)
|
|
||||||
```java
|
```java
|
||||||
public Connection getConn() throws Exception{
|
public Connection getConn() throws Exception{
|
||||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||||
String jdbcUrl = "jdbc:TAOS://127.0.0.1:0/log?user=root&password=taosdata";
|
String jdbcUrl = "jdbc:TAOS://taosdemo.com:6030/test?user=root&password=taosdata";
|
||||||
Properties connProps = new Properties();
|
Properties connProps = new Properties();
|
||||||
connProps.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
|
|
||||||
connProps.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
|
|
||||||
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR, "/etc/taos");
|
|
||||||
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
|
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
|
||||||
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
|
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
|
||||||
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
|
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
|
||||||
|
@ -111,16 +107,39 @@ public Connection getConn() throws Exception{
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
以上示例,建立一个到hostname为taosdemo.com,端口为6030,数据库名为test的连接。这个连接在url中指定了用户名(user)为root,密码(password)为taosdata,并在connProps中指定了使用的字符集、语言环境、时区等信息。
|
||||||
|
|
||||||
3. 客户端配置文件 taos.cfg
|
properties中的配置参数如下:
|
||||||
|
* TSDBDriver.PROPERTY_KEY_USER:登录 TDengine 用户名,默认值 root。
|
||||||
|
* TSDBDriver.PROPERTY_KEY_PASSWORD:用户登录密码,默认值 taosdata。
|
||||||
|
* TSDBDriver.PROPERTY_KEY_CONFIG_DIR:客户端配置文件目录路径,Linux OS 上默认值 /etc/taos ,Windows OS 上默认值 C:/TDengine/cfg。
|
||||||
|
* TSDBDriver.PROPERTY_KEY_CHARSET:客户端使用的字符集,默认值为系统字符集。
|
||||||
|
* TSDBDriver.PROPERTY_KEY_LOCALE:客户端语言环境,默认值系统当前 locale。
|
||||||
|
* TSDBDriver.PROPERTY_KEY_TIME_ZONE:客户端使用的时区,默认值为系统当前时区。
|
||||||
|
|
||||||
linux 系统默认配置文件为 /var/lib/taos/taos.cfg,windows 系统默认配置文件路径为 C:\TDengine\cfg\taos.cfg。
|
#### 使用客户端配置文件建立连接
|
||||||
```properties
|
当使用JDBC连接TDengine集群时,可以使用客户端配置文件,在客户端配置文件中指定集群的firstEp、secondEp参数。
|
||||||
# client default username
|
如下所示:
|
||||||
# defaultUser root
|
1. 在java中不指定hostname和port
|
||||||
|
```java
|
||||||
|
public Connection getConn() throws Exception{
|
||||||
|
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||||
|
String jdbcUrl = "jdbc:TAOS://:/test?user=root&password=taosdata";
|
||||||
|
Properties connProps = new Properties();
|
||||||
|
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
|
||||||
|
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
|
||||||
|
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
|
||||||
|
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
2. 在配置文件中指定firstEp和secondEp
|
||||||
|
```
|
||||||
|
# first fully qualified domain name (FQDN) for TDengine system
|
||||||
|
firstEp cluster_node1:6030
|
||||||
|
|
||||||
# client default password
|
# second fully qualified domain name (FQDN) for TDengine system, for cluster only
|
||||||
# defaultPass taosdata
|
secondEp cluster_node2:6030
|
||||||
|
|
||||||
# default system charset
|
# default system charset
|
||||||
# charset UTF-8
|
# charset UTF-8
|
||||||
|
@ -128,6 +147,19 @@ public Connection getConn() throws Exception{
|
||||||
# system locale
|
# system locale
|
||||||
# locale en_US.UTF-8
|
# locale en_US.UTF-8
|
||||||
```
|
```
|
||||||
|
|
||||||
|
以上示例,jdbc会使用客户端的配置文件,建立到hostname为cluster_node1,端口为6030,数据库名为test的连接。当集群中firstEp节点失效时,JDBC会尝试使用secondEp连接集群。
|
||||||
|
TDengine中,只要保证firstEp和secondEp中一个节点有效,就可以正常建立到集群的连接。
|
||||||
|
|
||||||
|
> 注意:这里的配置文件指的是调用JDBC Connector的应用程序所在机器上的配置文件,Linux OS 上默认值 /etc/taos/taos.cfg ,Windows OS 上默认值 C://TDengine/cfg/taos.cfg。
|
||||||
|
|
||||||
|
#### 配置参数的优先级
|
||||||
|
通过以上3种方式获取连接,如果配置参数在url、Properties、客户端配置文件中有重复,则参数的`优先级由高到低`分别如下:
|
||||||
|
1. JDBC URL 参数,如上所述,可以在 JDBC URL 的参数中指定。
|
||||||
|
2. Properties connProps
|
||||||
|
3. 客户端配置文件 taos.cfg
|
||||||
|
例如:在url中指定了password为taosdata,在Properties中指定了password为taosdemo,那么,JDBC会使用url中的password建立连接。
|
||||||
|
|
||||||
> 更多详细配置请参考[客户端配置][13]
|
> 更多详细配置请参考[客户端配置][13]
|
||||||
|
|
||||||
### 创建数据库和表
|
### 创建数据库和表
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM centos:7
|
FROM ubuntu:16
|
||||||
|
|
||||||
WORKDIR /root
|
WORKDIR /root
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
name: tdengine
|
name: tdengine
|
||||||
base: core18
|
base: core18
|
||||||
version: 'RELEASE_VERSION'
|
version: '2.0.5.1'
|
||||||
icon: snap/gui/t-dengine.svg
|
icon: snap/gui/t-dengine.svg
|
||||||
summary: an open-source big data platform designed and optimized for IoT.
|
summary: an open-source big data platform designed and optimized for IoT.
|
||||||
description: |
|
description: |
|
||||||
|
@ -72,7 +72,7 @@ parts:
|
||||||
- usr/bin/taosd
|
- usr/bin/taosd
|
||||||
- usr/bin/taos
|
- usr/bin/taos
|
||||||
- usr/bin/taosdemo
|
- usr/bin/taosdemo
|
||||||
- usr/lib/libtaos.so.RELEASE_VERSION
|
- usr/lib/libtaos.so.2.0.5.1
|
||||||
- usr/lib/libtaos.so.1
|
- usr/lib/libtaos.so.1
|
||||||
- usr/lib/libtaos.so
|
- usr/lib/libtaos.so
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tscProfile.h"
|
#include "tscProfile.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
static bool validImpl(const char* str, size_t maxsize) {
|
static bool validImpl(const char* str, size_t maxsize) {
|
||||||
if (str == NULL) {
|
if (str == NULL) {
|
||||||
|
@ -893,6 +894,8 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
pSql->fp = asyncCallback;
|
pSql->fp = asyncCallback;
|
||||||
pSql->fetchFp = asyncCallback;
|
pSql->fetchFp = asyncCallback;
|
||||||
pSql->param = pSql;
|
pSql->param = pSql;
|
||||||
|
|
||||||
|
registerSqlObj(pSql);
|
||||||
int code = tsParseSql(pSql, true);
|
int code = tsParseSql(pSql, true);
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
tsem_wait(&pSql->rspSem);
|
tsem_wait(&pSql->rspSem);
|
||||||
|
|
|
@ -210,12 +210,12 @@ void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
|
||||||
|
|
||||||
static void *dnodeProcessWriteQueue(void *param) {
|
static void *dnodeProcessWriteQueue(void *param) {
|
||||||
SWriteWorker *pWorker = (SWriteWorker *)param;
|
SWriteWorker *pWorker = (SWriteWorker *)param;
|
||||||
SWriteMsg *pWrite;
|
SWriteMsg * pWrite;
|
||||||
SWalHead *pHead;
|
SWalHead * pHead;
|
||||||
int32_t numOfMsgs;
|
int32_t numOfMsgs;
|
||||||
int type;
|
int type;
|
||||||
void *pVnode, *item;
|
void * pVnode, *item;
|
||||||
SRspRet *pRspRet;
|
SRspRet * pRspRet;
|
||||||
|
|
||||||
dDebug("write worker:%d is running", pWorker->workerId);
|
dDebug("write worker:%d is running", pWorker->workerId);
|
||||||
|
|
||||||
|
@ -237,16 +237,21 @@ static void *dnodeProcessWriteQueue(void *param) {
|
||||||
pHead->msgType = pWrite->rpcMsg.msgType;
|
pHead->msgType = pWrite->rpcMsg.msgType;
|
||||||
pHead->version = 0;
|
pHead->version = 0;
|
||||||
pHead->len = pWrite->contLen;
|
pHead->len = pWrite->contLen;
|
||||||
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
|
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
|
||||||
|
taosMsg[pWrite->rpcMsg.msgType]);
|
||||||
} else {
|
} else {
|
||||||
pHead = (SWalHead *)item;
|
pHead = (SWalHead *)item;
|
||||||
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], pHead->version);
|
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
|
||||||
|
pHead->version);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
|
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
|
||||||
if (pWrite) {
|
dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType],
|
||||||
|
pHead->version, tstrerror(code));
|
||||||
|
|
||||||
|
if (pWrite) {
|
||||||
pWrite->rpcMsg.code = code;
|
pWrite->rpcMsg.code = code;
|
||||||
if (code <= 0) pWrite->processedCount = 1;
|
if (code <= 0) pWrite->processedCount = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -258,7 +263,7 @@ static void *dnodeProcessWriteQueue(void *param) {
|
||||||
taosGetQitem(pWorker->qall, &type, &item);
|
taosGetQitem(pWorker->qall, &type, &item);
|
||||||
if (type == TAOS_QTYPE_RPC) {
|
if (type == TAOS_QTYPE_RPC) {
|
||||||
pWrite = (SWriteMsg *)item;
|
pWrite = (SWriteMsg *)item;
|
||||||
dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code);
|
dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code);
|
||||||
} else if (type == TAOS_QTYPE_FWD) {
|
} else if (type == TAOS_QTYPE_FWD) {
|
||||||
pHead = (SWalHead *)item;
|
pHead = (SWalHead *)item;
|
||||||
vnodeConfirmForward(pVnode, pHead->version, 0);
|
vnodeConfirmForward(pVnode, pHead->version, 0);
|
||||||
|
@ -279,13 +284,13 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
||||||
int32_t num = taosGetQueueNumber(pWorker->qset);
|
int32_t num = taosGetQueueNumber(pWorker->qset);
|
||||||
|
|
||||||
if (num > 0) {
|
if (num > 0) {
|
||||||
usleep(30000);
|
usleep(30000);
|
||||||
sched_yield();
|
sched_yield();
|
||||||
} else {
|
} else {
|
||||||
taosFreeQall(pWorker->qall);
|
taosFreeQall(pWorker->qall);
|
||||||
taosCloseQset(pWorker->qset);
|
taosCloseQset(pWorker->qset);
|
||||||
pWorker->qset = NULL;
|
pWorker->qset = NULL;
|
||||||
dDebug("write worker:%d is released", pWorker->workerId);
|
dDebug("write worker:%d is released", pWorker->workerId);
|
||||||
pthread_exit(NULL);
|
pthread_exit(NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -193,6 +193,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, 0, 0x0506, "No write p
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing data file")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing data file")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
|
||||||
|
|
||||||
|
@ -247,6 +248,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 0x080B, "CPU cores
|
||||||
// sync
|
// sync
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
|
||||||
|
|
|
@ -51,6 +51,7 @@ int walWrite(twalh, SWalHead *);
|
||||||
void walFsync(twalh);
|
void walFsync(twalh);
|
||||||
int walRestore(twalh, void *pVnode, FWalWrite writeFp);
|
int walRestore(twalh, void *pVnode, FWalWrite writeFp);
|
||||||
int walGetWalFile(twalh, char *name, uint32_t *index);
|
int walGetWalFile(twalh, char *name, uint32_t *index);
|
||||||
|
int64_t walGetVersion(twalh);
|
||||||
|
|
||||||
extern int wDebugFlag;
|
extern int wDebugFlag;
|
||||||
|
|
||||||
|
|
|
@ -594,7 +594,7 @@ static int sdbWrite(void *param, void *data, int type) {
|
||||||
pthread_mutex_unlock(&tsSdbObj.mutex);
|
pthread_mutex_unlock(&tsSdbObj.mutex);
|
||||||
sdbError("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64,
|
sdbError("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64,
|
||||||
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
|
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
|
||||||
return TSDB_CODE_MND_APP_ERROR;
|
return TSDB_CODE_SYN_INVALID_VERSION;
|
||||||
} else {
|
} else {
|
||||||
tsSdbObj.version = pHead->version;
|
tsSdbObj.version = pHead->version;
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,4 +4,4 @@ PROJECT(TDengine)
|
||||||
AUX_SOURCE_DIRECTORY(. SRC)
|
AUX_SOURCE_DIRECTORY(. SRC)
|
||||||
|
|
||||||
ADD_LIBRARY(os ${SRC})
|
ADD_LIBRARY(os ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(os m rt)
|
TARGET_LINK_LIBRARIES(os m rt z)
|
||||||
|
|
|
@ -313,7 +313,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
|
||||||
|
|
||||||
// always update version
|
// always update version
|
||||||
nodeVersion = pWalHead->version;
|
nodeVersion = pWalHead->version;
|
||||||
sDebug("replica:%d nodeRole:%d qtype:%d", pNode->replica, nodeRole, qtype);
|
sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole],
|
||||||
|
qtype, pWalHead->version);
|
||||||
|
|
||||||
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
|
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
|
||||||
|
|
||||||
|
@ -883,7 +884,7 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
|
||||||
SSyncNode * pNode = pPeer->pSyncNode;
|
SSyncNode * pNode = pPeer->pSyncNode;
|
||||||
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
|
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
|
||||||
|
|
||||||
sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id,
|
sDebug("%s, status msg is received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id,
|
||||||
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack);
|
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack);
|
||||||
|
|
||||||
pPeer->version = pPeersStatus->version;
|
pPeer->version = pPeersStatus->version;
|
||||||
|
@ -970,7 +971,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
|
||||||
|
|
||||||
int retLen = write(pPeer->peerFd, msg, statusMsgLen);
|
int retLen = write(pPeer->peerFd, msg, statusMsgLen);
|
||||||
if (retLen == statusMsgLen) {
|
if (retLen == statusMsgLen) {
|
||||||
sDebug("%s, status msg is sent", pPeer->id);
|
sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role],
|
||||||
|
pPeersStatus->version, pPeersStatus->ack);
|
||||||
} else {
|
} else {
|
||||||
sDebug("%s, failed to send status msg, restart", pPeer->id);
|
sDebug("%s, failed to send status msg, restart", pPeer->id);
|
||||||
syncRestartConnection(pPeer);
|
syncRestartConnection(pPeer);
|
||||||
|
|
|
@ -239,8 +239,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
|
|
||||||
code = vnodeReadVersion(pVnode);
|
code = vnodeReadVersion(pVnode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
vnodeCleanUp(pVnode);
|
vError("vgId:%d, failed to read version, generate it from data file", pVnode->vgId);
|
||||||
return code;
|
// Allow vnode start even when read version fails, set version as walVersion or zero
|
||||||
|
// vnodeCleanUp(pVnode);
|
||||||
|
// return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnode->fversion = pVnode->version;
|
pVnode->fversion = pVnode->version;
|
||||||
|
@ -292,6 +294,9 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
}
|
}
|
||||||
|
|
||||||
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
||||||
|
if (pVnode->version == 0) {
|
||||||
|
pVnode->version = walGetVersion(pVnode->wal);
|
||||||
|
}
|
||||||
|
|
||||||
SSyncInfo syncInfo;
|
SSyncInfo syncInfo;
|
||||||
syncInfo.vgId = pVnode->vgId;
|
syncInfo.vgId = pVnode->vgId;
|
||||||
|
@ -947,6 +952,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
||||||
len += snprintf(content + len, maxLen - len, "}\n");
|
len += snprintf(content + len, maxLen - len, "}\n");
|
||||||
|
|
||||||
fwrite(content, 1, len, fp);
|
fwrite(content, 1, len, fp);
|
||||||
|
fflush(fp);
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
|
|
||||||
vInfo("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->fversion);
|
vInfo("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->fversion);
|
||||||
|
@ -960,7 +966,7 @@ static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
|
||||||
cJSON *root = NULL;
|
cJSON *root = NULL;
|
||||||
int maxLen = 100;
|
int maxLen = 100;
|
||||||
|
|
||||||
terrno = TSDB_CODE_VND_APP_ERROR;
|
terrno = TSDB_CODE_VND_INVALID_VRESION_FILE;
|
||||||
sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
|
sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
|
||||||
FILE *fp = fopen(versionFile, "r");
|
FILE *fp = fopen(versionFile, "r");
|
||||||
if (!fp) {
|
if (!fp) {
|
||||||
|
@ -974,7 +980,7 @@ static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
content = calloc(1, maxLen + 1);
|
content = calloc(1, maxLen + 1);
|
||||||
int len = fread(content, 1, maxLen, fp);
|
int len = fread(content, 1, maxLen, fp);
|
||||||
if (len <= 0) {
|
if (len <= 0) {
|
||||||
vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId);
|
vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
|
@ -999,6 +1005,6 @@ static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
|
||||||
PARSE_OVER:
|
PARSE_OVER:
|
||||||
taosTFree(content);
|
taosTFree(content);
|
||||||
cJSON_Delete(root);
|
cJSON_Delete(root);
|
||||||
if(fp) fclose(fp);
|
if (fp) fclose(fp);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,18 +49,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
|
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
// tsdb may be in reset state
|
// tsdb may be in reset state
|
||||||
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
|
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
|
||||||
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
|
if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
|
||||||
|
|
||||||
// TODO: Later, let slave to support query
|
// TODO: Later, let slave to support query
|
||||||
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
|
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType],
|
||||||
|
pVnode->syncCfg.replica, syncRole[pVnode->role]);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
|
||||||
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
|
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, bool* freeHandle) {
|
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle) {
|
||||||
bool continueExec = false;
|
bool continueExec = false;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -106,55 +106,56 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeBuildNoResultQueryRsp(SRspRet* pRet) {
|
static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
|
||||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||||
pRet->len = sizeof(SRetrieveTableRsp);
|
pRet->len = sizeof(SRetrieveTableRsp);
|
||||||
|
|
||||||
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
||||||
SRetrieveTableRsp* pRsp = pRet->rsp;
|
SRetrieveTableRsp *pRsp = pRet->rsp;
|
||||||
|
|
||||||
pRsp->completed = true;
|
pRsp->completed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
void *pCont = pReadMsg->pCont;
|
void * pCont = pReadMsg->pCont;
|
||||||
int32_t contLen = pReadMsg->contLen;
|
int32_t contLen = pReadMsg->contLen;
|
||||||
SRspRet *pRet = &pReadMsg->rspRet;
|
SRspRet *pRet = &pReadMsg->rspRet;
|
||||||
|
|
||||||
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
|
SQueryTableMsg *pQueryTableMsg = (SQueryTableMsg *)pCont;
|
||||||
memset(pRet, 0, sizeof(SRspRet));
|
memset(pRet, 0, sizeof(SRspRet));
|
||||||
|
|
||||||
// qHandle needs to be freed correctly
|
// qHandle needs to be freed correctly
|
||||||
if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont;
|
SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pReadMsg->pCont;
|
||||||
killQueryMsg->free = htons(killQueryMsg->free);
|
killQueryMsg->free = htons(killQueryMsg->free);
|
||||||
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
|
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
|
||||||
|
|
||||||
vWarn("QInfo:%p connection %p broken, kill query", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
|
vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
|
||||||
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
|
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
|
||||||
|
|
||||||
void** qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) killQueryMsg->qhandle);
|
void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
|
||||||
if (qhandle == NULL || *qhandle == NULL) {
|
if (qhandle == NULL || *qhandle == NULL) {
|
||||||
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
|
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle,
|
||||||
|
pReadMsg->rpcMsg.handle);
|
||||||
} else {
|
} else {
|
||||||
assert(*qhandle == (void*) killQueryMsg->qhandle);
|
assert(*qhandle == (void *)killQueryMsg->qhandle);
|
||||||
|
|
||||||
qKillQuery(*qhandle);
|
qKillQuery(*qhandle);
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_TSC_QUERY_CANCELLED;
|
return TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
void** handle = NULL;
|
void ** handle = NULL;
|
||||||
|
|
||||||
if (contLen != 0) {
|
if (contLen != 0) {
|
||||||
qinfo_t pQInfo = NULL;
|
qinfo_t pQInfo = NULL;
|
||||||
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
|
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
|
||||||
|
|
||||||
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
|
||||||
pRsp->code = code;
|
pRsp->code = code;
|
||||||
pRsp->qhandle = 0;
|
pRsp->qhandle = 0;
|
||||||
|
|
||||||
pRet->len = sizeof(SQueryTableRsp);
|
pRet->len = sizeof(SQueryTableRsp);
|
||||||
|
@ -163,7 +164,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
// current connect is broken
|
// current connect is broken
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
|
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo);
|
||||||
if (handle == NULL) { // failed to register qhandle, todo add error test case
|
if (handle == NULL) { // failed to register qhandle, todo add error test case
|
||||||
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
|
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
|
||||||
tstrerror(pRsp->code));
|
tstrerror(pRsp->code));
|
||||||
|
@ -171,13 +172,15 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
qDestroyQueryInfo(pQInfo); // destroy it directly
|
qDestroyQueryInfo(pQInfo); // destroy it directly
|
||||||
} else {
|
} else {
|
||||||
assert(*handle == pQInfo);
|
assert(*handle == pQInfo);
|
||||||
pRsp->qhandle = htobe64((uint64_t) pQInfo);
|
pRsp->qhandle = htobe64((uint64_t)pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
if (handle != NULL &&
|
||||||
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
|
vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||||
|
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
|
||||||
|
pReadMsg->rpcMsg.handle);
|
||||||
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
return pRsp->code;
|
return pRsp->code;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -190,12 +193,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
assert(pCont != NULL);
|
assert(pCont != NULL);
|
||||||
void** qhandle = (void**) pCont;
|
void **qhandle = (void **)pCont;
|
||||||
|
|
||||||
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
|
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
|
||||||
|
|
||||||
bool freehandle = false;
|
bool freehandle = false;
|
||||||
bool buildRes = qTableQuery(*qhandle); // do execute query
|
bool buildRes = qTableQuery(*qhandle); // do execute query
|
||||||
|
|
||||||
// build query rsp, the retrieve request has reached here already
|
// build query rsp, the retrieve request has reached here already
|
||||||
if (buildRes) {
|
if (buildRes) {
|
||||||
|
@ -233,16 +236,17 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
pRetrieve->free = htons(pRetrieve->free);
|
pRetrieve->free = htons(pRetrieve->free);
|
||||||
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
|
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
|
||||||
|
|
||||||
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle);
|
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
|
||||||
|
pRetrieve->free, pReadMsg->rpcMsg.handle);
|
||||||
|
|
||||||
memset(pRet, 0, sizeof(SRspRet));
|
memset(pRet, 0, sizeof(SRspRet));
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
|
void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
|
||||||
if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) {
|
if (handle == NULL || (*handle) != (void *)pRetrieve->qhandle) {
|
||||||
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
vDebug("vgId:%d, invalid qhandle in retrieving result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
|
vDebug("vgId:%d, invalid qhandle in retrieving result, QInfo:%p", pVnode->vgId, (void *)pRetrieve->qhandle);
|
||||||
|
|
||||||
vnodeBuildNoResultQueryRsp(pRet);
|
vnodeBuildNoResultQueryRsp(pRet);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -250,7 +254,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
if (pRetrieve->free == 1) {
|
if (pRetrieve->free == 1) {
|
||||||
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
||||||
qKillQuery(*handle);
|
qKillQuery(*handle);
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
|
|
||||||
vnodeBuildNoResultQueryRsp(pRet);
|
vnodeBuildNoResultQueryRsp(pRet);
|
||||||
code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||||
|
@ -259,26 +263,27 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
// register the qhandle to connect to quit query immediate if connection is broken
|
// register the qhandle to connect to quit query immediate if connection is broken
|
||||||
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||||
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
|
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle,
|
||||||
|
pReadMsg->rpcMsg.handle);
|
||||||
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
qKillQuery(*handle);
|
qKillQuery(*handle);
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool freeHandle = true;
|
bool freeHandle = true;
|
||||||
bool buildRes = false;
|
bool buildRes = false;
|
||||||
|
|
||||||
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
|
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
//TODO handle malloc failure
|
// TODO handle malloc failure
|
||||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||||
pRet->len = sizeof(SRetrieveTableRsp);
|
pRet->len = sizeof(SRetrieveTableRsp);
|
||||||
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
||||||
freeHandle = true;
|
freeHandle = true;
|
||||||
} else { // result is not ready, return immediately
|
} else { // result is not ready, return immediately
|
||||||
if (!buildRes) {
|
if (!buildRes) {
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
|
||||||
return TSDB_CODE_QRY_NOT_READY;
|
return TSDB_CODE_QRY_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,7 +293,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
|
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
|
||||||
// Here free qhandle immediately
|
// Here free qhandle immediately
|
||||||
if (freeHandle) {
|
if (freeHandle) {
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -296,13 +301,13 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
// notify connection(handle) that current qhandle is created, if current connection from
|
// notify connection(handle) that current qhandle is created, if current connection from
|
||||||
// client is broken, the query needs to be killed immediately.
|
// client is broken, the query needs to be killed immediately.
|
||||||
int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
|
int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
|
||||||
SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
|
SRetrieveTableMsg *killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
|
||||||
killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
|
killQueryMsg->qhandle = htobe64((uint64_t)qhandle);
|
||||||
killQueryMsg->free = htons(1);
|
killQueryMsg->free = htons(1);
|
||||||
killQueryMsg->header.vgId = htonl(vgId);
|
killQueryMsg->header.vgId = htonl(vgId);
|
||||||
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
|
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
|
||||||
|
|
||||||
vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
|
vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
|
||||||
return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
|
return rpcReportProgress(handle, (char *)killQueryMsg, sizeof(SRetrieveTableMsg));
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,11 +47,11 @@ void vnodeInitWriteFp(void) {
|
||||||
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)param1;
|
SVnodeObj *pVnode = (SVnodeObj *)param1;
|
||||||
SWalHead *pHead = param2;
|
SWalHead * pHead = param2;
|
||||||
|
|
||||||
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
|
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
|
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
|
||||||
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
|
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
|
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
|
||||||
|
@ -59,28 +59,29 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
return TSDB_CODE_VND_NO_WRITE_AUTH;
|
return TSDB_CODE_VND_NO_WRITE_AUTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
// tsdb may be in reset state
|
// tsdb may be in reset state
|
||||||
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
|
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
|
||||||
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
|
if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
|
||||||
|
if (pHead->version == 0) { // from client or CQ
|
||||||
if (pHead->version == 0) { // from client or CQ
|
|
||||||
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status);
|
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType],
|
||||||
|
pVnode->status);
|
||||||
return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
|
return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role);
|
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[pHead->msgType],
|
||||||
|
pVnode->syncCfg.replica, syncRole[pVnode->role]);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
// assign version
|
// assign version
|
||||||
pVnode->version++;
|
pVnode->version++;
|
||||||
pHead->version = pVnode->version;
|
pHead->version = pVnode->version;
|
||||||
if (pVnode->delay) usleep(pVnode->delay*1000);
|
if (pVnode->delay) usleep(pVnode->delay * 1000);
|
||||||
|
|
||||||
} else { // from wal or forward
|
} else { // from wal or forward
|
||||||
// for data from WAL or forward, version may be smaller
|
// for data from WAL or forward, version may be smaller
|
||||||
if (pHead->version <= pVnode->version) return 0;
|
if (pHead->version <= pVnode->version) return 0;
|
||||||
}
|
}
|
||||||
|
@ -91,12 +92,12 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
code = walWrite(pVnode->wal, pHead);
|
code = walWrite(pVnode->wal, pHead);
|
||||||
if (code < 0) return code;
|
if (code < 0) return code;
|
||||||
|
|
||||||
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
|
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
|
||||||
int32_t syncCode = 0;
|
int32_t syncCode = 0;
|
||||||
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
|
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
|
||||||
if (syncCode < 0) return syncCode;
|
if (syncCode < 0) return syncCode;
|
||||||
|
|
||||||
// write data locally
|
// write data locally
|
||||||
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
|
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
|
||||||
if (code < 0) return code;
|
if (code < 0) return code;
|
||||||
|
|
||||||
|
@ -115,14 +116,14 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
|
||||||
|
|
||||||
// save insert result into item
|
// save insert result into item
|
||||||
SShellSubmitRspMsg *pRsp = NULL;
|
SShellSubmitRspMsg *pRsp = NULL;
|
||||||
if (pRet) {
|
if (pRet) {
|
||||||
pRet->len = sizeof(SShellSubmitRspMsg);
|
pRet->len = sizeof(SShellSubmitRspMsg);
|
||||||
pRet->rsp = rpcMallocCont(pRet->len);
|
pRet->rsp = rpcMallocCont(pRet->len);
|
||||||
pRsp = pRet->rsp;
|
pRsp = pRet->rsp;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
|
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,7 +192,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
|
||||||
|
|
||||||
int vnodeWriteToQueue(void *param, void *data, int type) {
|
int vnodeWriteToQueue(void *param, void *data, int type) {
|
||||||
SVnodeObj *pVnode = param;
|
SVnodeObj *pVnode = param;
|
||||||
SWalHead *pHead = data;
|
SWalHead * pHead = data;
|
||||||
|
|
||||||
int size = sizeof(SWalHead) + pHead->len;
|
int size = sizeof(SWalHead) + pHead->len;
|
||||||
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
|
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
|
||||||
|
@ -204,4 +205,3 @@ int vnodeWriteToQueue(void *param, void *data, int type) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ static void walRelease(SWal *pWal);
|
||||||
|
|
||||||
static void walModuleInitFunc() {
|
static void walModuleInitFunc() {
|
||||||
walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL");
|
walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL");
|
||||||
if (walTmrCtrl == NULL)
|
if (walTmrCtrl == NULL)
|
||||||
walModuleInit = PTHREAD_ONCE_INIT;
|
walModuleInit = PTHREAD_ONCE_INIT;
|
||||||
else
|
else
|
||||||
wDebug("WAL module is initialized");
|
wDebug("WAL module is initialized");
|
||||||
|
@ -90,7 +90,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_fetch_32(&tsWalNum, 1);
|
atomic_add_fetch_32(&tsWalNum, 1);
|
||||||
pWal->fd = -1;
|
pWal->fd = -1;
|
||||||
pWal->max = pCfg->wals;
|
pWal->max = pCfg->wals;
|
||||||
pWal->id = 0;
|
pWal->id = 0;
|
||||||
|
@ -117,18 +117,17 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||||
walRelease(pWal);
|
walRelease(pWal);
|
||||||
pWal = NULL;
|
pWal = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCfg->keep == 1) return pWal;
|
if (pCfg->keep == 1) return pWal;
|
||||||
|
|
||||||
if (walHandleExistingFiles(path) == 0)
|
if (walHandleExistingFiles(path) == 0) walRenew(pWal);
|
||||||
walRenew(pWal);
|
|
||||||
|
|
||||||
if (pWal && pWal->fd <0) {
|
if (pWal && pWal->fd < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("wal:%s, failed to open(%s)", path, strerror(errno));
|
wError("wal:%s, failed to open(%s)", path, strerror(errno));
|
||||||
walRelease(pWal);
|
walRelease(pWal);
|
||||||
pWal = NULL;
|
pWal = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod);
|
if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod);
|
||||||
return pWal;
|
return pWal;
|
||||||
|
@ -154,7 +153,7 @@ int walAlter(twalh wal, const SWalCfg *pCfg) {
|
||||||
pWal->fsyncPeriod = pCfg->fsyncPeriod;
|
pWal->fsyncPeriod = pCfg->fsyncPeriod;
|
||||||
if (walNeedFsyncTimer(pWal)) {
|
if (walNeedFsyncTimer(pWal)) {
|
||||||
wInfo("wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
|
wInfo("wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
|
||||||
taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer,walTmrCtrl);
|
taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer, walTmrCtrl);
|
||||||
} else {
|
} else {
|
||||||
wInfo("wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
|
wInfo("wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
|
||||||
taosTmrStop(pWal->timer);
|
taosTmrStop(pWal->timer);
|
||||||
|
@ -167,16 +166,16 @@ int walAlter(twalh wal, const SWalCfg *pCfg) {
|
||||||
|
|
||||||
void walClose(void *handle) {
|
void walClose(void *handle) {
|
||||||
if (handle == NULL) return;
|
if (handle == NULL) return;
|
||||||
|
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
taosClose(pWal->fd);
|
taosClose(pWal->fd);
|
||||||
if (pWal->timer) taosTmrStopA(&pWal->timer);
|
if (pWal->timer) taosTmrStopA(&pWal->timer);
|
||||||
|
|
||||||
if (pWal->keep == 0) {
|
if (pWal->keep == 0) {
|
||||||
// remove all files in the directory
|
// remove all files in the directory
|
||||||
for (int i=0; i<pWal->num; ++i) {
|
for (int i = 0; i < pWal->num; ++i) {
|
||||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id-i);
|
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i);
|
||||||
if (remove(pWal->name) <0) {
|
if (remove(pWal->name) < 0) {
|
||||||
wError("wal:%s, failed to remove", pWal->name);
|
wError("wal:%s, failed to remove", pWal->name);
|
||||||
} else {
|
} else {
|
||||||
wDebug("wal:%s, it is removed", pWal->name);
|
wDebug("wal:%s, it is removed", pWal->name);
|
||||||
|
@ -197,7 +196,7 @@ int walRenew(void *handle) {
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
|
||||||
if (pWal->fd >=0) {
|
if (pWal->fd >= 0) {
|
||||||
close(pWal->fd);
|
close(pWal->fd);
|
||||||
pWal->id++;
|
pWal->id++;
|
||||||
wDebug("wal:%s, it is closed", pWal->name);
|
wDebug("wal:%s, it is closed", pWal->name);
|
||||||
|
@ -218,7 +217,7 @@ int walRenew(void *handle) {
|
||||||
// remove the oldest wal file
|
// remove the oldest wal file
|
||||||
char name[TSDB_FILENAME_LEN * 3];
|
char name[TSDB_FILENAME_LEN * 3];
|
||||||
snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
|
snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
|
||||||
if (remove(name) <0) {
|
if (remove(name) < 0) {
|
||||||
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
|
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
|
||||||
} else {
|
} else {
|
||||||
wDebug("wal:%s, it is removed", name);
|
wDebug("wal:%s, it is removed", name);
|
||||||
|
@ -226,8 +225,8 @@ int walRenew(void *handle) {
|
||||||
|
|
||||||
pWal->num--;
|
pWal->num--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
|
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -239,7 +238,7 @@ int walWrite(void *handle, SWalHead *pHead) {
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
// no wal
|
// no wal
|
||||||
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
||||||
if (pHead->version <= pWal->version) return 0;
|
if (pHead->version <= pWal->version) return 0;
|
||||||
|
|
||||||
|
@ -247,7 +246,7 @@ int walWrite(void *handle, SWalHead *pHead) {
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
|
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
|
||||||
int contLen = pHead->len + sizeof(SWalHead);
|
int contLen = pHead->len + sizeof(SWalHead);
|
||||||
|
|
||||||
if(taosTWrite(pWal->fd, pHead, contLen) != contLen) {
|
if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
|
||||||
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
|
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
} else {
|
} else {
|
||||||
|
@ -258,7 +257,6 @@ int walWrite(void *handle, SWalHead *pHead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void walFsync(void *handle) {
|
void walFsync(void *handle) {
|
||||||
|
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
|
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
|
||||||
|
|
||||||
|
@ -276,12 +274,11 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
||||||
uint32_t maxId = 0, minId = -1, index =0;
|
uint32_t maxId = 0, minId = -1, index =0;
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
int plen = strlen(walPrefix);
|
int plen = strlen(walPrefix);
|
||||||
char opath[TSDB_FILENAME_LEN+5];
|
char opath[TSDB_FILENAME_LEN + 5];
|
||||||
|
|
||||||
int slen = snprintf(opath, sizeof(opath), "%s", pWal->path);
|
int slen = snprintf(opath, sizeof(opath), "%s", pWal->path);
|
||||||
if ( pWal->keep == 0)
|
if (pWal->keep == 0) strcpy(opath + slen, "/old");
|
||||||
strcpy(opath+slen, "/old");
|
|
||||||
|
|
||||||
DIR *dir = opendir(opath);
|
DIR *dir = opendir(opath);
|
||||||
if (dir == NULL && errno == ENOENT) return 0;
|
if (dir == NULL && errno == ENOENT) return 0;
|
||||||
|
@ -290,8 +287,8 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((ent = readdir(dir))!= NULL) {
|
while ((ent = readdir(dir)) != NULL) {
|
||||||
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
if (strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||||
index = atol(ent->d_name + plen);
|
index = atol(ent->d_name + plen);
|
||||||
if (index > maxId) maxId = index;
|
if (index > maxId) maxId = index;
|
||||||
if (index < minId) minId = index;
|
if (index < minId) minId = index;
|
||||||
|
@ -306,13 +303,13 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( count != (maxId-minId+1) ) {
|
if (count != (maxId - minId + 1)) {
|
||||||
wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
|
wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
|
||||||
terrno = TSDB_CODE_WAL_APP_ERROR;
|
terrno = TSDB_CODE_WAL_APP_ERROR;
|
||||||
} else {
|
} else {
|
||||||
wDebug("wal:%s, %d files will be restored", opath, count);
|
wDebug("wal:%s, %d files will be restored", opath, count);
|
||||||
|
|
||||||
for (index = minId; index<=maxId; ++index) {
|
for (index = minId; index <= maxId; ++index) {
|
||||||
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index);
|
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index);
|
||||||
terrno = walRestoreWalFile(pWal, pVnode, writeFp);
|
terrno = walRestoreWalFile(pWal, pVnode, writeFp);
|
||||||
if (terrno < 0) break;
|
if (terrno < 0) break;
|
||||||
|
@ -328,7 +325,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// open the existing WAL file in append mode
|
// open the existing WAL file in append mode
|
||||||
pWal->num = count;
|
pWal->num = count;
|
||||||
pWal->id = maxId;
|
pWal->id = maxId;
|
||||||
|
@ -345,9 +342,9 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
||||||
}
|
}
|
||||||
|
|
||||||
int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
||||||
SWal *pWal = handle;
|
SWal * pWal = handle;
|
||||||
int code = 1;
|
int code = 1;
|
||||||
int32_t first = 0;
|
int32_t first = 0;
|
||||||
|
|
||||||
name[0] = 0;
|
name[0] = 0;
|
||||||
if (pWal == NULL || pWal->num == 0) return 0;
|
if (pWal == NULL || pWal->num == 0) return 0;
|
||||||
|
@ -359,18 +356,17 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
||||||
|
|
||||||
if (*index < first && *index > pWal->id) {
|
if (*index < first && *index > pWal->id) {
|
||||||
code = -1; // index out of range
|
code = -1; // index out of range
|
||||||
} else {
|
} else {
|
||||||
sprintf(name, "wal/%s%d", walPrefix, *index);
|
sprintf(name, "wal/%s%d", walPrefix, *index);
|
||||||
code = (*index == pWal->id) ? 0:1;
|
code = (*index == pWal->id) ? 0 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&(pWal->mutex));
|
pthread_mutex_unlock(&(pWal->mutex));
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void walRelease(SWal *pWal) {
|
static void walRelease(SWal *pWal) {
|
||||||
|
|
||||||
pthread_mutex_destroy(&pWal->mutex);
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
pWal->signature = NULL;
|
pWal->signature = NULL;
|
||||||
free(pWal);
|
free(pWal);
|
||||||
|
@ -385,12 +381,12 @@ static void walRelease(SWal *pWal) {
|
||||||
|
|
||||||
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
|
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
|
||||||
char *name = pWal->name;
|
char *name = pWal->name;
|
||||||
int size = 1024 * 1024; // default 1M buffer size
|
int size = 1024 * 1024; // default 1M buffer size
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
char *buffer = malloc(size);
|
char *buffer = malloc(size);
|
||||||
if (buffer == NULL) {
|
if (buffer == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -487,21 +483,21 @@ int walHandleExistingFiles(const char *path) {
|
||||||
} else {
|
} else {
|
||||||
// move all files to old directory
|
// move all files to old directory
|
||||||
int count = 0;
|
int count = 0;
|
||||||
while ((ent = readdir(dir))!= NULL) {
|
while ((ent = readdir(dir)) != NULL) {
|
||||||
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
if (strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||||
snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name);
|
snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name);
|
||||||
snprintf(nname, sizeof(nname), "%s/old/%s", path, ent->d_name);
|
snprintf(nname, sizeof(nname), "%s/old/%s", path, ent->d_name);
|
||||||
if (taosMkDir(opath, 0755) != 0) {
|
if (taosMkDir(opath, 0755) != 0) {
|
||||||
wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno));
|
wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rename(oname, nname) < 0) {
|
if (rename(oname, nname) < 0) {
|
||||||
wError("wal:%s, failed to move to new:%s", oname, nname);
|
wError("wal:%s, failed to move to new:%s", oname, nname);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
|
@ -509,34 +505,34 @@ int walHandleExistingFiles(const char *path) {
|
||||||
|
|
||||||
wDebug("wal:%s, %d files are moved for restoration", path, count);
|
wDebug("wal:%s, %d files are moved for restoration", path, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int walRemoveWalFiles(const char *path) {
|
static int walRemoveWalFiles(const char *path) {
|
||||||
int plen = strlen(walPrefix);
|
int plen = strlen(walPrefix);
|
||||||
char name[TSDB_FILENAME_LEN * 3];
|
char name[TSDB_FILENAME_LEN * 3];
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
struct dirent *ent;
|
struct dirent *ent;
|
||||||
DIR *dir = opendir(path);
|
DIR *dir = opendir(path);
|
||||||
if (dir == NULL && errno == ENOENT) return 0;
|
if (dir == NULL && errno == ENOENT) return 0;
|
||||||
if (dir == NULL) {
|
if (dir == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((ent = readdir(dir))!= NULL) {
|
while ((ent = readdir(dir)) != NULL) {
|
||||||
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
if (strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||||
snprintf(name, sizeof(name), "%s/%s", path, ent->d_name);
|
snprintf(name, sizeof(name), "%s/%s", path, ent->d_name);
|
||||||
if (remove(name) <0) {
|
if (remove(name) < 0) {
|
||||||
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
|
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
|
|
||||||
|
@ -561,3 +557,10 @@ static void walProcessFsyncTimer(void *param, void *tmrId) {
|
||||||
pWal->timer = NULL;
|
pWal->timer = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t walGetVersion(twalh param) {
|
||||||
|
SWal *pWal = param;
|
||||||
|
if (pWal == 0) return 0;
|
||||||
|
|
||||||
|
return pWal->version;
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
# Color setting
|
||||||
|
RED='\033[0;31m'
|
||||||
|
GREEN='\033[1;32m'
|
||||||
|
GREEN_DARK='\033[0;32m'
|
||||||
|
GREEN_UNDERLINE='\033[4;32m'
|
||||||
|
NC='\033[0m'
|
||||||
|
|
||||||
|
grep 'start to execute\|ERROR SUMMARY' mem-error-out.log|grep -v 'grep'|uniq|tee uniq-mem-error-out.log
|
||||||
|
|
||||||
|
for memError in `grep 'ERROR SUMMARY' uniq-mem-error-out.log | awk '{print $4}'`
|
||||||
|
do
|
||||||
|
if [ -n "$memError" ]; then
|
||||||
|
if [ "$memError" -gt 12 ]; then
|
||||||
|
echo -e "${RED} ## Memory errors number valgrind reports is $memError.\
|
||||||
|
More than our threshold! ## ${NC}"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
grep 'start to execute\|definitely lost:' mem-error-out.log|grep -v 'grep'|uniq|tee uniq-definitely-lost-out.log
|
||||||
|
for defiMemError in `grep 'definitely lost:' uniq-definitely-lost-out.log | awk '{print $7}'`
|
||||||
|
do
|
||||||
|
if [ -n "$defiMemError" ]; then
|
||||||
|
if [ "$defiMemError" -gt 13 ]; then
|
||||||
|
echo -e "${RED} ## Memory errors number valgrind reports \
|
||||||
|
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
done
|
|
@ -121,7 +121,7 @@ if [ "$2" != "sim" ]; then
|
||||||
elif [ "$1" == "full" ]; then
|
elif [ "$1" == "full" ]; then
|
||||||
echo "### run Python full test ###"
|
echo "### run Python full test ###"
|
||||||
runPyCaseOneByOne fulltest.sh
|
runPyCaseOneByOne fulltest.sh
|
||||||
elif [ "$1" == "b1" ]; then
|
elif [ "$1" == "pytest" ]; then
|
||||||
echo "### run Python full test ###"
|
echo "### run Python full test ###"
|
||||||
runPyCaseOneByOne fulltest.sh
|
runPyCaseOneByOne fulltest.sh
|
||||||
elif [ "$1" == "b2" ] || [ "$1" == "b3" ]; then
|
elif [ "$1" == "b2" ] || [ "$1" == "b3" ]; then
|
||||||
|
|
Loading…
Reference in New Issue