Merge remote-tracking branch 'upstream/develop' into odbc
This commit is contained in:
commit
941c80f34f
|
@ -56,6 +56,7 @@ matrix:
|
|||
|
||||
py3ver=`python3 --version|awk '{print $2}'|cut -d "." -f 1,2` && apt install python$py3ver-dev
|
||||
pip3 install psutil
|
||||
pip3 install guppy3
|
||||
pip3 install --user ${TRAVIS_BUILD_DIR}/src/connector/python/linux/python3/
|
||||
|
||||
cd ${TRAVIS_BUILD_DIR}/tests
|
||||
|
|
|
@ -4,7 +4,7 @@ PROJECT(TDengine)
|
|||
IF (DEFINED VERNUMBER)
|
||||
SET(TD_VER_NUMBER ${VERNUMBER})
|
||||
ELSE ()
|
||||
SET(TD_VER_NUMBER "2.0.4.0")
|
||||
SET(TD_VER_NUMBER "2.0.5.1")
|
||||
ENDIF ()
|
||||
|
||||
IF (DEFINED VERCOMPATIBLE)
|
||||
|
|
|
@ -82,8 +82,7 @@ TDengine系统后台服务由taosd提供,可以在配置文件taos.cfg里修
|
|||
|
||||
下面仅仅列出一些重要的配置参数,更多的参数请看配置文件里的说明。各个参数的详细介绍及作用请看前述章节,而且这些参数的缺省配置都是工作的,一般无需设置。**注意:配置修改后,需要重启*taosd*服务才能生效。**
|
||||
|
||||
- firstEp: taosd启动时,主动连接的集群中第一个dnode的end point, 默认值为localhost:6030。
|
||||
- secondEp: taosd启动时,如果first连接不上,尝试连接集群中第二个dnode的end point, 默认值为空。
|
||||
- firstEp: taosd启动时,主动连接的集群中首个dnode的end point, 默认值为localhost:6030。
|
||||
- fqdn:数据节点的FQDN,缺省为操作系统配置的第一个hostname。如果习惯IP地址访问,可设置为该节点的IP地址。
|
||||
- serverPort:taosd启动后,对外服务的端口号,默认值为6030。
|
||||
- httpPort: RESTful服务使用的端口号,所有的HTTP请求(TCP)都需要向该接口发起查询/写入请求, 默认值为6041。
|
||||
|
@ -156,10 +155,9 @@ TDengine系统的前台交互客户端应用程序为taos,它与taosd共享同
|
|||
客户端配置参数
|
||||
|
||||
- firstEp: taos启动时,主动连接的集群中第一个taosd实例的end point, 缺省值为 localhost:6030。
|
||||
- secondEp: taos启动时,如果first连接不上,尝试连接集群中第二个taosd实例的end point, 缺省值为空。
|
||||
- locale
|
||||
|
||||
> 默认值:系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置
|
||||
默认值:系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置
|
||||
|
||||
TDengine为存储中文、日文、韩文等非ASCII编码的宽字符,提供一种专门的字段类型nchar。写入nchar字段的数据将统一采用UCS4-LE格式进行编码并发送到服务器。需要注意的是,编码正确性是客户端来保证。因此,如果用户想要正常使用nchar字段来存储诸如中文、日文、韩文等非ASCII字符,需要正确设置客户端的编码格式。
|
||||
|
||||
|
@ -169,7 +167,7 @@ TDengine为存储中文、日文、韩文等非ASCII编码的宽字符,提供
|
|||
|
||||
- charset
|
||||
|
||||
> 默认值:系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置
|
||||
默认值:系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置
|
||||
|
||||
如果配置文件中不设置charset,在Linux系统中,taos在启动时候,自动读取系统当前的locale信息,并从locale信息中解析提取charset编码格式。如果自动读取locale信息失败,则尝试读取charset配置,如果读取charset配置也失败,则中断启动过程。
|
||||
|
||||
|
@ -227,6 +225,11 @@ charset的有效值是UTF-8。
|
|||
|
||||
启动taos时,也可以从命令行指定一个taosd实例的end point,否则就从taos.cfg读取。
|
||||
|
||||
- maxBinaryDisplayWidth
|
||||
|
||||
Shell中binary 和 nchar字段的显示宽度上限,超过此限制的部分将被隐藏。默认值:30。可在 shell 中通过命令 set max_binary_display_width nn 动态修改此选项。
|
||||
|
||||
|
||||
## 用户管理
|
||||
|
||||
系统管理员可以在CLI界面里添加、删除用户,也可以修改密码。CLI里SQL语法如下:
|
||||
|
|
|
@ -6,10 +6,7 @@
|
|||
########################################################
|
||||
|
||||
# first fully qualified domain name (FQDN) for TDengine system
|
||||
# firstEp hostname1:6030
|
||||
|
||||
# second fully qualified domain name (FQDN) for TDengine system, for cluster only
|
||||
# secondEp cluster_hostname2:6030
|
||||
# firstEp hostname:6030
|
||||
|
||||
# local fully qualified domain name (FQDN)
|
||||
# fqdn hostname
|
||||
|
@ -29,12 +26,18 @@
|
|||
# number of threads per CPU core
|
||||
# numOfThreadsPerCore 1.0
|
||||
|
||||
# the proportion of total threads responsible for query
|
||||
# ratioOfQueryThreads 0.5
|
||||
|
||||
# number of management nodes in the system
|
||||
# numOfMnodes 3
|
||||
|
||||
# enable/disable backuping vnode directory when removing dnode
|
||||
# vnodeBak 1
|
||||
|
||||
# if report installation / use information
|
||||
# telemetryReporting 1
|
||||
|
||||
# enable/disable load balancing
|
||||
# balance 1
|
||||
|
||||
|
@ -89,9 +92,6 @@
|
|||
# max number of tables per vnode
|
||||
# maxTablesPerVnode 1000000
|
||||
|
||||
# step size of increasing table number in a vnode
|
||||
# tableIncStepPerVnode 1000
|
||||
|
||||
# cache block size (Mbyte)
|
||||
# cache 16
|
||||
|
||||
|
@ -110,6 +110,9 @@
|
|||
# maximum rows of records in file block
|
||||
# maxRows 4096
|
||||
|
||||
# the number of acknowledgments required for successful data writing
|
||||
# quorum 1
|
||||
|
||||
# enable/disable compression
|
||||
# comp 2
|
||||
|
||||
|
@ -122,15 +125,6 @@
|
|||
# number of replications, for cluster only
|
||||
# replica 1
|
||||
|
||||
# mqtt hostname
|
||||
# mqttHostName test.mosquitto.org
|
||||
|
||||
# mqtt port
|
||||
# mqttPort 1883
|
||||
|
||||
# mqtt topic
|
||||
# mqttTopic /test
|
||||
|
||||
# the compressed rpc message, option:
|
||||
# -1 (no compression)
|
||||
# 0 (all message compressed),
|
||||
|
@ -167,12 +161,12 @@
|
|||
# stop writing data when the disk size of the log folder is less than this value
|
||||
# minimalDataDirGB 0.1
|
||||
|
||||
# One mnode is equal to the number of vnode consumed
|
||||
# mnodeEqualVnodeNum 4
|
||||
|
||||
# enbale/disable http service
|
||||
# http 1
|
||||
|
||||
# enable/disable muqq service
|
||||
# mqtt 0
|
||||
|
||||
# enable/disable system monitor
|
||||
# monitor 1
|
||||
|
||||
|
@ -189,11 +183,12 @@
|
|||
# max number of rows per log filters
|
||||
# numOfLogLines 10000000
|
||||
|
||||
# enable/disable async log
|
||||
# asyncLog 1
|
||||
|
||||
# time of keeping log files, days
|
||||
# logKeepDays 0
|
||||
|
||||
# enable/disable async log
|
||||
# asyncLog 1
|
||||
|
||||
# The following parameters are used for debug purpose only.
|
||||
# debugFlag 8 bits mask: FILE-SCREEN-UNUSED-HeartBeat-DUMP-TRACE_WARN-ERROR
|
||||
|
@ -231,18 +226,12 @@
|
|||
# debug flag for JNI
|
||||
# jniDebugflag 131
|
||||
|
||||
# debug flag for ODBC
|
||||
# odbcDebugflag 131
|
||||
|
||||
# debug flag for storage
|
||||
# uDebugflag 131
|
||||
|
||||
# debug flag for http server
|
||||
# httpDebugFlag 131
|
||||
|
||||
# debug flag for mqtt
|
||||
# mqttDebugFlag 131
|
||||
|
||||
# debug flag for monitor
|
||||
# monitorDebugFlag 131
|
||||
|
||||
|
@ -255,6 +244,9 @@
|
|||
# debug flag for http server
|
||||
# tsdbDebugFlag 131
|
||||
|
||||
# debug flag for continue query
|
||||
# cqDebugFlag 131
|
||||
|
||||
# enable/disable recording the SQL in taos client
|
||||
# tscEnableRecordSql 0
|
||||
|
||||
|
|
|
@ -2,9 +2,11 @@ FROM centos:7
|
|||
|
||||
WORKDIR /root
|
||||
|
||||
ARG version
|
||||
RUN echo $version
|
||||
COPY tdengine.tar.gz /root/
|
||||
RUN tar -zxf tdengine.tar.gz
|
||||
WORKDIR /root/TDengine-server/
|
||||
WORKDIR /root/TDengine-server-$version/
|
||||
RUN sh install.sh -e no
|
||||
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#!/bin/bash
|
||||
set -x
|
||||
docker build --rm -f "Dockerfile" -t tdengine/tdengine:$1 "."
|
||||
docker build --rm -f "Dockerfile" -t tdengine/tdengine:$1 "." --build-arg version=$1
|
||||
docker login -u tdengine -p $2 #replace the docker registry username and password
|
||||
docker push tdengine/tdengine:$1
|
||||
|
|
|
@ -272,6 +272,29 @@ function install_config() {
|
|||
break
|
||||
fi
|
||||
done
|
||||
|
||||
# user email
|
||||
#EMAIL_PATTERN='^[A-Za-z0-9\u4e00-\u9fa5]+@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)+$'
|
||||
#EMAIL_PATTERN='^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$'
|
||||
#EMAIL_PATTERN="^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$"
|
||||
echo
|
||||
echo -e -n "${GREEN}Enter your email address for priority support or enter empty to skip${NC}: "
|
||||
read emailAddr
|
||||
while true; do
|
||||
if [ ! -z "$emailAddr" ]; then
|
||||
# check the format of the emailAddr
|
||||
#if [[ "$emailAddr" =~ $EMAIL_PATTERN ]]; then
|
||||
# Write the email address to temp file
|
||||
email_file="${install_main_dir}/email"
|
||||
${csudo} bash -c "echo $emailAddr > ${email_file}"
|
||||
break
|
||||
#else
|
||||
# read -p "Please enter the correct email address: " emailAddr
|
||||
#fi
|
||||
else
|
||||
break
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -957,11 +957,16 @@ static void balanceMonitorDnodeModule() {
|
|||
continue;
|
||||
}
|
||||
|
||||
mLInfo("dnode:%d, numOfMnodes:%d expect:%d, add mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes);
|
||||
mnodeAddMnode(pDnode->dnodeId);
|
||||
mLInfo("dnode:%d, numOfMnodes:%d expect:%d, create mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes);
|
||||
mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, true);
|
||||
|
||||
#if 0
|
||||
// Only create one mnode each time
|
||||
return;
|
||||
#else
|
||||
numOfMnodes = mnodeGetMnodesNum();
|
||||
if (numOfMnodes >= tsNumOfMnodes) return;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "tcache.h"
|
||||
#include "tnote.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
#include "tscLog.h"
|
||||
#include "tscSubquery.h"
|
||||
#include "tscUtil.h"
|
||||
|
@ -260,6 +261,9 @@ void taos_close(TAOS *taos) {
|
|||
return;
|
||||
}
|
||||
|
||||
pObj->signature = NULL;
|
||||
taosTmrStopA(&(pObj->pTimer));
|
||||
|
||||
SSqlObj* pHb = pObj->pHb;
|
||||
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
|
||||
if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode
|
||||
|
@ -698,8 +702,10 @@ void taos_stop_query(TAOS_RES *res) {
|
|||
tscKillSTableQuery(pSql);
|
||||
} else {
|
||||
if (pSql->cmd.command < TSDB_SQL_LOCAL) {
|
||||
assert(pSql->pRpcCtx != NULL);
|
||||
if (pSql->pRpcCtx != NULL) {
|
||||
rpcCancelRequest(pSql->pRpcCtx);
|
||||
pSql->pRpcCtx = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ extern int32_t tsStatusInterval;
|
|||
extern int32_t tsNumOfMnodes;
|
||||
extern int32_t tsEnableVnodeBak;
|
||||
extern int32_t tsEnableTelemetryReporting;
|
||||
extern char tsEmail[];
|
||||
|
||||
// common
|
||||
extern int tsRpcTimer;
|
||||
|
|
|
@ -42,6 +42,7 @@ int32_t tsStatusInterval = 1; // second
|
|||
int32_t tsNumOfMnodes = 3;
|
||||
int32_t tsEnableVnodeBak = 1;
|
||||
int32_t tsEnableTelemetryReporting = 1;
|
||||
char tsEmail[TSDB_FQDN_LEN] = {0};
|
||||
|
||||
// common
|
||||
int32_t tsRpcTimer = 1000;
|
||||
|
@ -307,6 +308,8 @@ bool taosCfgDynamicOptions(char *msg) {
|
|||
|
||||
static void doInitGlobalConfig(void) {
|
||||
osInit();
|
||||
srand(taosSafeRand());
|
||||
|
||||
SGlobalCfg cfg = {0};
|
||||
|
||||
// ip address
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
|
||||
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b
|
|
@ -33,7 +33,8 @@ typedef struct {
|
|||
} SMPeerWorker;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
int32_t curNum;
|
||||
int32_t maxNum;
|
||||
SMPeerWorker *peerWorker;
|
||||
} SMPeerWorkerPool;
|
||||
|
||||
|
@ -46,37 +47,44 @@ static void *dnodeProcessMnodePeerQueue(void *param);
|
|||
int32_t dnodeInitMnodePeer() {
|
||||
tsMPeerQset = taosOpenQset();
|
||||
|
||||
tsMPeerPool.num = 1;
|
||||
tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.num);
|
||||
tsMPeerPool.maxNum = 1;
|
||||
tsMPeerPool.curNum = 0;
|
||||
tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.maxNum);
|
||||
|
||||
if (tsMPeerPool.peerWorker == NULL) return -1;
|
||||
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
|
||||
for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
|
||||
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
|
||||
pWorker->workerId = i;
|
||||
dDebug("dnode mpeer worker:%d is created", i);
|
||||
}
|
||||
|
||||
dInfo("dnode mpeer is opened");
|
||||
dDebug("dnode mpeer is opened, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dnodeCleanupMnodePeer() {
|
||||
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
|
||||
for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
|
||||
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
|
||||
if (pWorker->thread) {
|
||||
taosQsetThreadResume(tsMPeerQset);
|
||||
}
|
||||
dDebug("dnode mpeer worker:%d is closed", i);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
|
||||
for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
|
||||
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
|
||||
dDebug("dnode mpeer worker:%d start to join", i);
|
||||
if (pWorker->thread) {
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
dDebug("dnode mpeer worker:%d join success", i);
|
||||
}
|
||||
|
||||
dDebug("dnode mpeer is closed, qset:%p", tsMPeerQset);
|
||||
|
||||
taosCloseQset(tsMPeerQset);
|
||||
tsMPeerQset = NULL;
|
||||
taosTFree(tsMPeerPool.peerWorker);
|
||||
dInfo("dnode mpeer is closed");
|
||||
}
|
||||
|
||||
int32_t dnodeAllocateMnodePqueue() {
|
||||
|
@ -85,7 +93,7 @@ int32_t dnodeAllocateMnodePqueue() {
|
|||
|
||||
taosAddIntoQset(tsMPeerQset, tsMPeerQueue, NULL);
|
||||
|
||||
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
|
||||
for (int32_t i = tsMPeerPool.curNum; i < tsMPeerPool.maxNum; ++i) {
|
||||
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
|
||||
pWorker->workerId = i;
|
||||
|
||||
|
@ -98,7 +106,9 @@ int32_t dnodeAllocateMnodePqueue() {
|
|||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.num);
|
||||
|
||||
tsMPeerPool.curNum = i + 1;
|
||||
dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.maxNum);
|
||||
}
|
||||
|
||||
dDebug("dnode mpeer queue:%p is allocated", tsMPeerQueue);
|
||||
|
@ -106,6 +116,7 @@ int32_t dnodeAllocateMnodePqueue() {
|
|||
}
|
||||
|
||||
void dnodeFreeMnodePqueue() {
|
||||
dDebug("dnode mpeer queue:%p is freed", tsMPeerQueue);
|
||||
taosCloseQueue(tsMPeerQueue);
|
||||
tsMPeerQueue = NULL;
|
||||
}
|
||||
|
@ -148,7 +159,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) {
|
|||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) {
|
||||
dDebug("dnodeProcessMnodePeerQueue: got no message from qset, exiting...");
|
||||
dDebug("qset:%p, mnode peer got no message from qset, exiting", tsMPeerQset);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,8 @@ typedef struct {
|
|||
} SMReadWorker;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
int32_t curNum;
|
||||
int32_t maxNum;
|
||||
SMReadWorker *readWorker;
|
||||
} SMReadWorkerPool;
|
||||
|
||||
|
@ -46,40 +47,46 @@ static void *dnodeProcessMnodeReadQueue(void *param);
|
|||
int32_t dnodeInitMnodeRead() {
|
||||
tsMReadQset = taosOpenQset();
|
||||
|
||||
tsMReadPool.num = tsNumOfCores * tsNumOfThreadsPerCore / 2;
|
||||
tsMReadPool.num = MAX(2, tsMReadPool.num);
|
||||
tsMReadPool.num = MIN(4, tsMReadPool.num);
|
||||
tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.num);
|
||||
tsMReadPool.maxNum = tsNumOfCores * tsNumOfThreadsPerCore / 2;
|
||||
tsMReadPool.maxNum = MAX(2, tsMReadPool.maxNum);
|
||||
tsMReadPool.maxNum = MIN(4, tsMReadPool.maxNum);
|
||||
tsMReadPool.curNum = 0;
|
||||
tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.maxNum);
|
||||
|
||||
if (tsMReadPool.readWorker == NULL) return -1;
|
||||
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
|
||||
for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
|
||||
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
|
||||
pWorker->workerId = i;
|
||||
dDebug("dnode mread worker:%d is created", i);
|
||||
}
|
||||
|
||||
dInfo("dnode mread is opened");
|
||||
dDebug("dnode mread is opened, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dnodeCleanupMnodeRead() {
|
||||
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
|
||||
for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
|
||||
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
|
||||
if (pWorker->thread) {
|
||||
taosQsetThreadResume(tsMReadQset);
|
||||
}
|
||||
dDebug("dnode mread worker:%d is closed", i);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
|
||||
for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
|
||||
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
|
||||
dDebug("dnode mread worker:%d start to join", i);
|
||||
if (pWorker->thread) {
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
dDebug("dnode mread worker:%d start to join", i);
|
||||
}
|
||||
|
||||
taosCloseQset(tsMReadQset);
|
||||
free(tsMReadPool.readWorker);
|
||||
dDebug("dnode mread is closed, qset:%p", tsMReadQset);
|
||||
|
||||
dInfo("dnode mread is closed");
|
||||
taosCloseQset(tsMReadQset);
|
||||
tsMReadQset = NULL;
|
||||
free(tsMReadPool.readWorker);
|
||||
}
|
||||
|
||||
int32_t dnodeAllocateMnodeRqueue() {
|
||||
|
@ -88,7 +95,7 @@ int32_t dnodeAllocateMnodeRqueue() {
|
|||
|
||||
taosAddIntoQset(tsMReadQset, tsMReadQueue, NULL);
|
||||
|
||||
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
|
||||
for (int32_t i = tsMReadPool.curNum; i < tsMReadPool.maxNum; ++i) {
|
||||
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
|
||||
pWorker->workerId = i;
|
||||
|
||||
|
@ -101,7 +108,8 @@ int32_t dnodeAllocateMnodeRqueue() {
|
|||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.num);
|
||||
tsMReadPool.curNum = i + 1;
|
||||
dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.maxNum);
|
||||
}
|
||||
|
||||
dDebug("dnode mread queue:%p is allocated", tsMReadQueue);
|
||||
|
@ -109,6 +117,7 @@ int32_t dnodeAllocateMnodeRqueue() {
|
|||
}
|
||||
|
||||
void dnodeFreeMnodeRqueue() {
|
||||
dDebug("dnode mread queue:%p is freed", tsMReadQueue);
|
||||
taosCloseQueue(tsMReadQueue);
|
||||
tsMReadQueue = NULL;
|
||||
}
|
||||
|
@ -156,7 +165,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) {
|
|||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pReadMsg, &unUsed) == 0) {
|
||||
dDebug("dnodeProcessMnodeReadQueue: got no message from qset, exiting...");
|
||||
dDebug("qset:%p, mnode read got no message from qset, exiting", tsMReadQset);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,8 @@ typedef struct {
|
|||
} SMWriteWorker;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
int32_t curNum;
|
||||
int32_t maxNum;
|
||||
SMWriteWorker *writeWorker;
|
||||
} SMWriteWorkerPool;
|
||||
|
||||
|
@ -48,37 +49,44 @@ static void *dnodeProcessMnodeWriteQueue(void *param);
|
|||
int32_t dnodeInitMnodeWrite() {
|
||||
tsMWriteQset = taosOpenQset();
|
||||
|
||||
tsMWritePool.num = 1;
|
||||
tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.num);
|
||||
tsMWritePool.maxNum = 1;
|
||||
tsMWritePool.curNum = 0;
|
||||
tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.maxNum);
|
||||
|
||||
if (tsMWritePool.writeWorker == NULL) return -1;
|
||||
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
|
||||
for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
|
||||
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
|
||||
pWorker->workerId = i;
|
||||
dDebug("dnode mwrite worker:%d is created", i);
|
||||
}
|
||||
|
||||
dInfo("dnode mwrite is opened");
|
||||
dDebug("dnode mwrite is opened, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dnodeCleanupMnodeWrite() {
|
||||
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
|
||||
for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
|
||||
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
|
||||
if (pWorker->thread) {
|
||||
taosQsetThreadResume(tsMWriteQset);
|
||||
}
|
||||
dDebug("dnode mwrite worker:%d is closed", i);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
|
||||
for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
|
||||
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
|
||||
dDebug("dnode mwrite worker:%d start to join", i);
|
||||
if (pWorker->thread) {
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
dDebug("dnode mwrite worker:%d join success", i);
|
||||
}
|
||||
|
||||
dDebug("dnode mwrite is closed, qset:%p", tsMWriteQset);
|
||||
|
||||
taosCloseQset(tsMWriteQset);
|
||||
tsMWriteQset = NULL;
|
||||
taosTFree(tsMWritePool.writeWorker);
|
||||
dInfo("dnode mwrite is closed");
|
||||
}
|
||||
|
||||
int32_t dnodeAllocateMnodeWqueue() {
|
||||
|
@ -87,7 +95,7 @@ int32_t dnodeAllocateMnodeWqueue() {
|
|||
|
||||
taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL);
|
||||
|
||||
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
|
||||
for (int32_t i = tsMWritePool.curNum; i < tsMWritePool.maxNum; ++i) {
|
||||
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
|
||||
pWorker->workerId = i;
|
||||
|
||||
|
@ -100,7 +108,8 @@ int32_t dnodeAllocateMnodeWqueue() {
|
|||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.num);
|
||||
tsMWritePool.curNum = i + 1;
|
||||
dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.maxNum);
|
||||
}
|
||||
|
||||
dDebug("dnode mwrite queue:%p is allocated", tsMWriteQueue);
|
||||
|
@ -108,6 +117,7 @@ int32_t dnodeAllocateMnodeWqueue() {
|
|||
}
|
||||
|
||||
void dnodeFreeMnodeWqueue() {
|
||||
dDebug("dnode mwrite queue:%p is freed", tsMWriteQueue);
|
||||
taosCloseQueue(tsMWriteQueue);
|
||||
tsMWriteQueue = NULL;
|
||||
}
|
||||
|
@ -122,11 +132,15 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
|
|||
SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
|
||||
mnodeCreateMsg(pWrite, pMsg);
|
||||
|
||||
dDebug("app:%p:%p, msg:%s is put into mwrite queue", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType]);
|
||||
dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
|
||||
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
|
||||
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
|
||||
}
|
||||
|
||||
static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
|
||||
dDebug("app:%p:%p, msg:%s is freed from mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
|
||||
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
|
||||
|
||||
mnodeCleanupMsg(pWrite);
|
||||
taosFreeQitem(pWrite);
|
||||
}
|
||||
|
@ -158,7 +172,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) {
|
|||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) {
|
||||
dDebug("dnodeProcessMnodeWriteQueue: got no message from qset, exiting...");
|
||||
dDebug("qset:%p, mnode write got no message from qset, exiting", tsMWriteQset);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -182,8 +196,8 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) {
|
|||
dnodeSendRedirectMsg(pMsg, true);
|
||||
dnodeFreeMnodeWriteMsg(pWrite);
|
||||
} else {
|
||||
dDebug("app:%p:%p, msg:%s is reput into mwrite queue, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
|
||||
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
|
||||
dDebug("app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
|
||||
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry);
|
||||
|
||||
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
|
||||
}
|
||||
|
|
|
@ -74,6 +74,7 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
|
|||
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
|
||||
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
|
||||
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
|
||||
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
|
||||
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
|
||||
|
||||
int32_t dnodeInitMgmt() {
|
||||
|
@ -82,6 +83,7 @@ int32_t dnodeInitMgmt() {
|
|||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
|
||||
|
||||
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
|
||||
dnodeReadDnodeCfg();
|
||||
|
@ -226,7 +228,7 @@ static void *dnodeProcessMgmtQueue(void *param) {
|
|||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) {
|
||||
dDebug("dnode mgmt got no message from qset, exit ...");
|
||||
dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -451,10 +453,34 @@ static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
|
||||
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont;
|
||||
SMDCfgDnodeMsg *pCfg = pMsg->pCont;
|
||||
return taosCfgDynamicOptions(pCfg->config);
|
||||
}
|
||||
|
||||
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
|
||||
SMDCreateMnodeMsg *pCfg = pMsg->pCont;
|
||||
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||
if (pCfg->dnodeId != dnodeGetDnodeId()) {
|
||||
dError("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
|
||||
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
|
||||
}
|
||||
|
||||
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
|
||||
dError("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
|
||||
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
|
||||
}
|
||||
|
||||
dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.nodeNum);
|
||||
for (int i = 0; i < pCfg->mnodes.nodeNum; ++i) {
|
||||
pCfg->mnodes.nodeInfos[i].nodeId = htonl(pCfg->mnodes.nodeInfos[i].nodeId);
|
||||
dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.nodeInfos[i].nodeId, pCfg->mnodes.nodeInfos[i].nodeEp);
|
||||
}
|
||||
|
||||
dnodeStartMnode(&pCfg->mnodes);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
|
||||
if (pEpSet->numOfEps <= 0) {
|
||||
dError("mnode EP list for peer is changed, but content is invalid, discard it");
|
||||
|
@ -465,29 +491,6 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
|
|||
for (int i = 0; i < pEpSet->numOfEps; ++i) {
|
||||
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
|
||||
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
|
||||
|
||||
if (!mnodeIsRunning()) {
|
||||
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
|
||||
dInfo("mnode index:%d %s:%u should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]);
|
||||
bool find = false;
|
||||
for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) {
|
||||
if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) {
|
||||
dInfo("localEp found in mnode infos");
|
||||
find = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!find) {
|
||||
dInfo("localEp not found in mnode infos, will set into mnode infos");
|
||||
tstrncpy(tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeEp, tsLocalEp, TSDB_EP_LEN);
|
||||
tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeId = dnodeGetDnodeId();
|
||||
tsDMnodeInfos.nodeNum++;
|
||||
}
|
||||
|
||||
dnodeStartMnode();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tsDMnodeEpSet = *pEpSet;
|
||||
|
@ -532,7 +535,9 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
|
||||
dnodeProcessModuleStatus(pCfg->moduleStatus);
|
||||
|
||||
// will not set mnode in status msg
|
||||
// dnodeProcessModuleStatus(pCfg->moduleStatus);
|
||||
dnodeUpdateDnodeCfg(pCfg);
|
||||
|
||||
dnodeUpdateMnodeInfos(pMnodes);
|
||||
|
@ -576,7 +581,7 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
|
|||
}
|
||||
|
||||
dnodeSaveMnodeInfos();
|
||||
sdbUpdateSync();
|
||||
sdbUpdateAsync();
|
||||
}
|
||||
|
||||
static bool dnodeReadMnodeInfos() {
|
||||
|
|
|
@ -146,7 +146,9 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
|
|||
}
|
||||
}
|
||||
|
||||
bool dnodeStartMnode() {
|
||||
bool dnodeStartMnode(void *pMnodes) {
|
||||
SDMMnodeInfos *mnodes = pMnodes;
|
||||
|
||||
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) {
|
||||
dDebug("mnode module is already started, module status:%d", tsModuleStatus);
|
||||
return false;
|
||||
|
@ -156,6 +158,7 @@ bool dnodeStartMnode() {
|
|||
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
|
||||
dnodeProcessModuleStatus(moduleStatus);
|
||||
|
||||
sdbUpdateSync();
|
||||
sdbUpdateSync(mnodes);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ int32_t dnodeInitServer() {
|
|||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue;
|
||||
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue;
|
||||
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue;
|
||||
|
@ -170,8 +171,12 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
|
|||
rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg);
|
||||
}
|
||||
|
||||
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
|
||||
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
|
||||
SRpcEpSet epSet = {0};
|
||||
dnodeGetMnodeEpSetForPeer(&epSet);
|
||||
rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp);
|
||||
}
|
||||
|
||||
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
|
||||
rpcSendRecv(tsDnodeClientRpc, epSet, rpcMsg, rpcRsp);
|
||||
}
|
|
@ -156,7 +156,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
|
|||
|
||||
dDebug("user:%s, send auth msg to mnodes", user);
|
||||
SRpcMsg rpcRsp = {0};
|
||||
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp);
|
||||
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
|
||||
|
||||
if (rpcRsp.code != 0) {
|
||||
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
|
||||
|
@ -189,7 +189,7 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) {
|
|||
rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE;
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp);
|
||||
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
|
||||
terrno = rpcRsp.code;
|
||||
|
||||
if (rpcRsp.code != 0) {
|
||||
|
|
|
@ -178,6 +178,7 @@ static void addVersionInfo(SBufferWriter* bw) {
|
|||
addStringField(bw, "version", version);
|
||||
addStringField(bw, "buildInfo", buildinfo);
|
||||
addStringField(bw, "gitInfo", gitinfo);
|
||||
addStringField(bw, "email", tsEmail);
|
||||
}
|
||||
|
||||
static void addRuntimeInfo(SBufferWriter* bw) {
|
||||
|
@ -261,11 +262,27 @@ static void* telemetryThread(void* param) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void dnodeGetEmail(char* filepath) {
|
||||
int fd = open(filepath, O_RDONLY);
|
||||
if (fd < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (taosTRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
|
||||
dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
|
||||
}
|
||||
|
||||
close(fd);
|
||||
}
|
||||
|
||||
|
||||
int32_t dnodeInitTelemetry() {
|
||||
if (!tsEnableTelemetryReporting) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
dnodeGetEmail("/usr/local/taos/email");
|
||||
|
||||
if (tsem_init(&tsExitSem, 0, 0) == -1) {
|
||||
// just log the error, it is ok for telemetry to fail
|
||||
dTrace("failed to create semaphore for telemetry, reason:%s", strerror(errno));
|
||||
|
|
|
@ -199,7 +199,7 @@ static void *dnodeProcessReadQueue(void *param) {
|
|||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
|
||||
dDebug("dnodeProcessReadQueee: got no message from qset, exiting...");
|
||||
dDebug("qset:%p dnode read got no message from qset, exiting", readQset);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -222,7 +222,7 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
|
||||
if (numOfMsgs == 0) {
|
||||
dDebug("dnodeProcessWriteQueee: got no message from qset, exiting...");
|
||||
dDebug("qset:%p, dnode write got no message from qset, exiting", pWorker->qset);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,11 +43,12 @@ void dnodeGetMnodeEpSetForPeer(void *epSet);
|
|||
void dnodeGetMnodeEpSetForShell(void *epSet);
|
||||
void * dnodeGetMnodeInfos();
|
||||
int32_t dnodeGetDnodeId();
|
||||
bool dnodeStartMnode();
|
||||
bool dnodeStartMnode(void *pModes);
|
||||
|
||||
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
||||
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
|
||||
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
|
||||
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
|
||||
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
|
||||
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid);
|
||||
|
||||
void *dnodeAllocateVnodeWqueue(void *pVnode);
|
||||
|
|
|
@ -60,7 +60,8 @@ int32_t mnodeInitSystem();
|
|||
int32_t mnodeStartSystem();
|
||||
void mnodeCleanupSystem();
|
||||
void mnodeStopSystem();
|
||||
void sdbUpdateSync();
|
||||
void sdbUpdateAsync();
|
||||
void sdbUpdateSync(void *pMnodes);
|
||||
bool mnodeIsRunning();
|
||||
int32_t mnodeProcessRead(SMnodeMsg *pMsg);
|
||||
int32_t mnodeProcessWrite(SMnodeMsg *pMsg);
|
||||
|
|
|
@ -139,6 +139,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, 0, 0x0339, "Vgroup alr
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_FREE, 0, 0x033A, "Dnode not avaliable")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, 0, 0x033B, "Cluster id not match")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, 0, 0x033C, "Cluster not ready")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED, 0, 0x033D, "Dnode Id not configured")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED, 0, 0x033E, "Dnode Ep not configured")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "Account already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "Invalid account")
|
||||
|
|
|
@ -59,7 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
|
|||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
|
||||
|
||||
|
@ -719,6 +719,12 @@ typedef struct {
|
|||
char ep[TSDB_EP_LEN]; // end point, hostname:port
|
||||
} SCMCreateDnodeMsg, SCMDropDnodeMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port
|
||||
SDMMnodeInfos mnodes;
|
||||
} SMDCreateMnodeMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int32_t vgId;
|
||||
|
|
|
@ -31,7 +31,7 @@ typedef enum {
|
|||
int32_t mnodeInitMnodes();
|
||||
void mnodeCleanupMnodes();
|
||||
|
||||
int32_t mnodeAddMnode(int32_t dnodeId);
|
||||
void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm);
|
||||
int32_t mnodeDropMnode(int32_t dnodeId);
|
||||
void mnodeDropMnodeLocal(int32_t dnodeId);
|
||||
|
||||
|
|
|
@ -224,7 +224,7 @@ static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows,
|
|||
mnodeDecClusterRef(pCluster);
|
||||
numOfRows++;
|
||||
}
|
||||
|
||||
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
|
||||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
|
|
@ -147,7 +147,7 @@ static int32_t mnodeDnodeActionRestored() {
|
|||
mnodeCreateDnode(tsLocalEp, NULL);
|
||||
SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp);
|
||||
if (pDnode != NULL) {
|
||||
mnodeAddMnode(pDnode->dnodeId);
|
||||
mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, false);
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
}
|
||||
}
|
||||
|
@ -857,6 +857,7 @@ int32_t mnodeRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pC
|
|||
|
||||
char* pWrite;
|
||||
char* moduleName[5] = { "MNODE", "HTTP", "MONITOR", "MQTT", "UNKNOWN" };
|
||||
int32_t cols;
|
||||
|
||||
while (numOfRows < rows) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
|
@ -864,7 +865,7 @@ int32_t mnodeRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pC
|
|||
if (pDnode == NULL) break;
|
||||
|
||||
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
|
||||
int32_t cols = 0;
|
||||
cols = 0;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int16_t *)pWrite = pDnode->dnodeId;
|
||||
|
@ -890,6 +891,7 @@ int32_t mnodeRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pC
|
|||
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
}
|
||||
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
|
||||
|
||||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
|
@ -1081,6 +1083,7 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
|
|||
} else {
|
||||
numOfRows = 0;
|
||||
}
|
||||
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
|
||||
|
||||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
|
|
|
@ -109,7 +109,7 @@ int32_t mnodeStartSystem() {
|
|||
|
||||
mInfo("mnode is initialized successfully");
|
||||
|
||||
sdbUpdateSync();
|
||||
sdbUpdateSync(NULL);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@
|
|||
#include "tutil.h"
|
||||
#include "tsocket.h"
|
||||
#include "tdataformat.h"
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "mnodeDef.h"
|
||||
#include "mnodeInt.h"
|
||||
#include "mnodeMnode.h"
|
||||
|
@ -30,6 +32,7 @@
|
|||
#include "mnodeSdb.h"
|
||||
#include "mnodeShow.h"
|
||||
#include "mnodeUser.h"
|
||||
#include "mnodeVgroup.h"
|
||||
|
||||
static void * tsMnodeSdb = NULL;
|
||||
static int32_t tsMnodeUpdateSize = 0;
|
||||
|
@ -266,7 +269,61 @@ void mnodeGetMnodeInfos(void *mnodeInfos) {
|
|||
mnodeMnodeUnLock();
|
||||
}
|
||||
|
||||
int32_t mnodeAddMnode(int32_t dnodeId) {
|
||||
static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) {
|
||||
mDebug("dnode:%d, send create mnode msg to dnode %s", dnodeId, dnodeEp);
|
||||
|
||||
SMDCreateMnodeMsg *pCreate = rpcMallocCont(sizeof(SMDCreateMnodeMsg));
|
||||
if (pCreate == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
pCreate->dnodeId = htonl(dnodeId);
|
||||
tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
|
||||
pCreate->mnodes = tsMnodeInfos;
|
||||
bool found = false;
|
||||
for (int i = 0; i < pCreate->mnodes.nodeNum; ++i) {
|
||||
if (pCreate->mnodes.nodeInfos[i].nodeId == htonl(dnodeId)) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeId = htonl(dnodeId);
|
||||
tstrncpy(pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
|
||||
pCreate->mnodes.nodeNum++;
|
||||
}
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.pCont = pCreate;
|
||||
rpcMsg.contLen = sizeof(SMDCreateMnodeMsg);
|
||||
rpcMsg.msgType = TSDB_MSG_TYPE_MD_CREATE_MNODE;
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
SRpcEpSet epSet = mnodeGetEpSetFromIp(pCreate->dnodeEp);
|
||||
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp, &epSet);
|
||||
|
||||
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
|
||||
mError("dnode:%d, failed to send create mnode msg, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(rpcRsp.code));
|
||||
} else {
|
||||
mDebug("dnode:%d, create mnode msg is disposed, mnode is created in dnode", dnodeId);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcRsp.pCont);
|
||||
return rpcRsp.code;
|
||||
}
|
||||
|
||||
static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("failed to create mnode, reason:%s", tstrerror(code));
|
||||
} else {
|
||||
mDebug("mnode is created successfully");
|
||||
mnodeUpdateMnodeEpSet();
|
||||
sdbUpdateAsync();
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
|
||||
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
|
||||
pMnode->mnodeId = dnodeId;
|
||||
pMnode->createdTime = taosGetTimestampMs();
|
||||
|
@ -275,16 +332,24 @@ int32_t mnodeAddMnode(int32_t dnodeId) {
|
|||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsMnodeSdb,
|
||||
.pObj = pMnode,
|
||||
.writeCb = mnodeCreateMnodeCb
|
||||
};
|
||||
|
||||
int32_t code = sdbInsertRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
taosTFree(pMnode);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (needConfirm) {
|
||||
code = mnodeSendCreateMnodeMsg(dnodeId, dnodeEp);
|
||||
}
|
||||
|
||||
mnodeUpdateMnodeEpSet();
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosTFree(pMnode);
|
||||
return;
|
||||
}
|
||||
|
||||
return code;
|
||||
code = sdbInsertRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code));
|
||||
taosTFree(pMnode);
|
||||
}
|
||||
}
|
||||
|
||||
void mnodeDropMnodeLocal(int32_t dnodeId) {
|
||||
|
@ -296,6 +361,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) {
|
|||
}
|
||||
|
||||
mnodeUpdateMnodeEpSet();
|
||||
sdbUpdateAsync();
|
||||
}
|
||||
|
||||
int32_t mnodeDropMnode(int32_t dnodeId) {
|
||||
|
@ -315,6 +381,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
|
|||
sdbDecRef(tsMnodeSdb, pMnode);
|
||||
|
||||
mnodeUpdateMnodeEpSet();
|
||||
sdbUpdateAsync();
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -413,6 +480,7 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
|
|||
|
||||
mnodeDecMnodeRef(pMnode);
|
||||
}
|
||||
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
|
||||
|
||||
pShow->numOfReads += numOfRows;
|
||||
|
||||
|
|
|
@ -58,10 +58,15 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
|
|||
rpcRsp->rsp = epSet;
|
||||
rpcRsp->len = sizeof(SRpcEpSet);
|
||||
|
||||
mDebug("%p, msg:%s in mpeer queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
|
||||
mDebug("%p, msg:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
|
||||
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
|
||||
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
|
||||
mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i]));
|
||||
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
|
||||
epSet->inUse = (i + 1) % epSet->numOfEps;
|
||||
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
|
||||
} else {
|
||||
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_RPC_REDIRECT;
|
||||
|
|
|
@ -51,14 +51,21 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
|
|||
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
|
||||
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
|
||||
mnodeGetMnodeEpSetForShell(epSet);
|
||||
|
||||
mDebug("%p, msg:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
|
||||
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
|
||||
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
|
||||
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
|
||||
epSet->inUse = (i + 1) % epSet->numOfEps;
|
||||
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
|
||||
} else {
|
||||
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
|
||||
}
|
||||
}
|
||||
|
||||
rpcRsp->rsp = epSet;
|
||||
rpcRsp->len = sizeof(SRpcEpSet);
|
||||
|
||||
mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse);
|
||||
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
|
||||
mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i]));
|
||||
}
|
||||
|
||||
return TSDB_CODE_RPC_REDIRECT;
|
||||
}
|
||||
|
||||
|
|
|
@ -91,6 +91,7 @@ typedef struct {
|
|||
} SSdbWriteWorkerPool;
|
||||
|
||||
extern void * tsMnodeTmr;
|
||||
static void * tsUpdateSyncTmr;
|
||||
static SSdbObject tsSdbObj = {0};
|
||||
static taos_qset tsSdbWriteQset;
|
||||
static taos_qall tsSdbWriteQall;
|
||||
|
@ -297,27 +298,25 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
|||
taosFreeQitem(pOper);
|
||||
}
|
||||
|
||||
void sdbUpdateSync() {
|
||||
static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); }
|
||||
|
||||
void sdbUpdateAsync() {
|
||||
taosTmrReset(sdbUpdateSyncTmrFp, 200, NULL, tsMnodeTmr, &tsUpdateSyncTmr);
|
||||
}
|
||||
|
||||
void sdbUpdateSync(void *pMnodes) {
|
||||
SDMMnodeInfos *mnodes = pMnodes;
|
||||
if (!mnodeIsRunning()) {
|
||||
mDebug("mnode not start yet, update sync info later");
|
||||
mDebug("mnode not start yet, update sync config later");
|
||||
return;
|
||||
}
|
||||
|
||||
mDebug("update sync info in sdb");
|
||||
mDebug("update sync config in sync module, mnodes:%p", pMnodes);
|
||||
|
||||
SSyncCfg syncCfg = {0};
|
||||
int32_t index = 0;
|
||||
|
||||
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
|
||||
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
|
||||
SDMMnodeInfo *node = &mnodes->nodeInfos[i];
|
||||
syncCfg.nodeInfo[i].nodeId = node->nodeId;
|
||||
taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[i].nodeFqdn, &syncCfg.nodeInfo[i].nodePort);
|
||||
syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
|
||||
index++;
|
||||
}
|
||||
|
||||
if (index == 0) {
|
||||
if (mnodes == NULL) {
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMnodeObj *pMnode = NULL;
|
||||
|
@ -337,9 +336,19 @@ void sdbUpdateSync() {
|
|||
mnodeDecMnodeRef(pMnode);
|
||||
}
|
||||
sdbFreeIter(pIter);
|
||||
syncCfg.replica = index;
|
||||
mDebug("mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica);
|
||||
} else {
|
||||
for (index = 0; index < mnodes->nodeNum; ++index) {
|
||||
SDMMnodeInfo *node = &mnodes->nodeInfos[index];
|
||||
syncCfg.nodeInfo[index].nodeId = node->nodeId;
|
||||
taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort);
|
||||
syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC;
|
||||
}
|
||||
syncCfg.replica = index;
|
||||
mDebug("mnodes info input, numOfMnodes:%d", syncCfg.replica);
|
||||
}
|
||||
|
||||
syncCfg.replica = index;
|
||||
syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2;
|
||||
|
||||
bool hasThisDnode = false;
|
||||
|
@ -350,8 +359,15 @@ void sdbUpdateSync() {
|
|||
}
|
||||
}
|
||||
|
||||
if (!hasThisDnode) return;
|
||||
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return;
|
||||
if (!hasThisDnode) {
|
||||
sdbDebug("update sync config, this dnode not exist");
|
||||
return;
|
||||
}
|
||||
|
||||
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) {
|
||||
sdbDebug("update sync config, info not changed");
|
||||
return;
|
||||
}
|
||||
|
||||
sdbInfo("work as mnode, replica:%d", syncCfg.replica);
|
||||
for (int32_t i = 0; i < syncCfg.replica; ++i) {
|
||||
|
@ -1038,7 +1054,7 @@ static void *sdbWorkerFp(void *param) {
|
|||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed);
|
||||
if (numOfMsgs == 0) {
|
||||
sdbDebug("sdbWorkerFp: got no message from qset, exiting...");
|
||||
sdbDebug("qset:%p, sdb got no message from qset, exiting", tsSdbWriteQset);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -385,6 +385,7 @@ static int32_t mnodeRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, voi
|
|||
numOfRows++;
|
||||
mnodeDecUserRef(pUser);
|
||||
}
|
||||
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
|
||||
|
||||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
|
|
|
@ -310,7 +310,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
|
|||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||
if (pVgid->pDnode == pDnode) {
|
||||
mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d", pDnode->dnodeId, pVgroup->vgId, pVgid->role);
|
||||
mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d:%s", pDnode->dnodeId, pVgroup->vgId, pVgid->role, syncRole[pVgid->role]);
|
||||
pVgid->role = pVload->role;
|
||||
if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
|
||||
pVgroup->inUse = i;
|
||||
|
@ -771,6 +771,7 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
|
|||
mnodeDecVgroupRef(pVgroup);
|
||||
numOfRows++;
|
||||
}
|
||||
mnodeVacuumResult(data, cols, numOfRows, rows, pShow);
|
||||
|
||||
pShow->numOfReads += numOfRows;
|
||||
mnodeDecTableRef(pTable);
|
||||
|
|
|
@ -54,12 +54,18 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
|
|||
rpcRsp->rsp = epSet;
|
||||
rpcRsp->len = sizeof(SRpcEpSet);
|
||||
|
||||
mDebug("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType],
|
||||
epSet->inUse);
|
||||
mDebug("app:%p:%p, msg:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, pMsg,
|
||||
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
|
||||
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
|
||||
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
|
||||
epSet->inUse = (i + 1) % epSet->numOfEps;
|
||||
mDebug("app:%p:%p, mnode index:%d ep:%s:%d, set inUse to %d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i],
|
||||
htons(epSet->port[i]), epSet->inUse);
|
||||
} else {
|
||||
mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i],
|
||||
htons(epSet->port[i]));
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_RPC_REDIRECT;
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ static void *httpProcessResultQueue(void *param) {
|
|||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
|
||||
httpDebug("httpResultQueue: got no message from qset, exiting...");
|
||||
httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -4511,7 +4511,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
pQuery->precision = tsdbGetCfg(tsdb)->precision;
|
||||
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
|
||||
pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
|
||||
|
||||
|
@ -6324,6 +6323,8 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
|
|||
UNUSED(ret);
|
||||
}
|
||||
|
||||
pQuery->precision = tsdbGetCfg(tsdb)->precision;
|
||||
|
||||
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
|
||||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
|
||||
qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
|
||||
|
|
|
@ -542,10 +542,7 @@ void rpcCancelRequest(void *handle) {
|
|||
|
||||
if (pContext->pConn) {
|
||||
tDebug("%s, app tries to cancel request", pContext->pConn->info);
|
||||
pContext->pConn->pReqMsg = NULL;
|
||||
rpcCloseConn(pContext->pConn);
|
||||
pContext->pConn = NULL;
|
||||
rpcFreeCont(pContext->pCont);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -613,9 +610,11 @@ static void rpcReleaseConn(SRpcConn *pConn) {
|
|||
if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg
|
||||
} else {
|
||||
// if there is an outgoing message, free it
|
||||
if (pConn->outType && pConn->pReqMsg)
|
||||
if (pConn->outType && pConn->pReqMsg) {
|
||||
if (pConn->pContext) pConn->pContext->pConn = NULL;
|
||||
rpcFreeMsg(pConn->pReqMsg);
|
||||
}
|
||||
}
|
||||
|
||||
// memset could not be used, since lockeBy can not be reset
|
||||
pConn->inType = 0;
|
||||
|
@ -1121,9 +1120,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
|
|||
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;
|
||||
if (pEpSet->numOfEps > 0) {
|
||||
memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
|
||||
tDebug("%s, redirect is received, numOfEps:%d", pConn->info, pContext->epSet.numOfEps);
|
||||
for (int i=0; i<pContext->epSet.numOfEps; ++i)
|
||||
tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
|
||||
pContext->epSet.inUse);
|
||||
for (int i = 0; i < pContext->epSet.numOfEps; ++i) {
|
||||
pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
|
||||
tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.fqdn[i],
|
||||
pContext->epSet.port[i]);
|
||||
}
|
||||
}
|
||||
rpcSendReqToServer(pRpc, pContext);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
|
|
|
@ -525,7 +525,7 @@ static void *taosProcessTcpData(void *param) {
|
|||
while (pThreadObj->pHead) {
|
||||
SFdObj *pFdObj = pThreadObj->pHead;
|
||||
pThreadObj->pHead = pFdObj->next;
|
||||
taosFreeFdObj(pFdObj);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
}
|
||||
|
||||
pthread_mutex_destroy(&(pThreadObj->mutex));
|
||||
|
|
|
@ -215,6 +215,9 @@ void syncStop(void *param) {
|
|||
|
||||
pthread_mutex_lock(&(pNode->mutex));
|
||||
|
||||
if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t));
|
||||
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
|
||||
|
||||
for (int i = 0; i < pNode->replica; ++i) {
|
||||
pPeer = pNode->peerInfo[i];
|
||||
if (pPeer) syncRemovePeer(pPeer);
|
||||
|
@ -223,9 +226,6 @@ void syncStop(void *param) {
|
|||
pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
|
||||
if (pPeer) syncRemovePeer(pPeer);
|
||||
|
||||
if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t));
|
||||
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
|
||||
|
||||
pthread_mutex_unlock(&(pNode->mutex));
|
||||
|
||||
syncDecNodeRef(pNode);
|
||||
|
@ -313,6 +313,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
|
|||
|
||||
// always update version
|
||||
nodeVersion = pWalHead->version;
|
||||
sDebug("replica:%d nodeRole:%d qtype:%d", pNode->replica, nodeRole, qtype);
|
||||
|
||||
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
|
||||
|
||||
// only pkt from RPC or CQ can be forwarded
|
||||
|
@ -1189,6 +1191,8 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
|
|||
static void syncMonitorFwdInfos(void *param, void *tmrId) {
|
||||
SSyncNode *pNode = param;
|
||||
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
||||
if (pSyncFwds == NULL) return;
|
||||
|
||||
uint64_t time = taosGetTimestampMs();
|
||||
|
||||
if (pSyncFwds->fwds > 0) {
|
||||
|
|
|
@ -2639,8 +2639,7 @@ int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STa
|
|||
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
|
||||
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||
|
||||
int32_t i = 0;
|
||||
for(; i < size; ++i) {
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
STableIdInfo *id = taosArrayGet(pTableIdList, i);
|
||||
|
||||
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
|
||||
|
@ -2665,8 +2664,12 @@ int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STa
|
|||
return terrno;
|
||||
}
|
||||
|
||||
pGroupInfo->numOfTables = i;
|
||||
pGroupInfo->numOfTables = taosArrayGetSize(group);
|
||||
if (pGroupInfo->numOfTables > 0) {
|
||||
taosArrayPush(pGroupInfo->pGroupList, &group);
|
||||
} else {
|
||||
taosArrayDestroy(group);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -263,6 +263,7 @@ void taosCloseQset(taos_qset param) {
|
|||
// thread to exit.
|
||||
void taosQsetThreadResume(taos_qset param) {
|
||||
STaosQset *qset = (STaosQset *)param;
|
||||
uDebug("qset:%p, it will exit", qset);
|
||||
tsem_post(&qset->sem);
|
||||
}
|
||||
|
||||
|
|
|
@ -427,7 +427,7 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
|
|||
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
|
||||
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(false);
|
||||
// ASSERT(false);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>2.0.7</version>
|
||||
<version>2.0.8</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
|
|
@ -42,11 +42,37 @@ TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
|
|||
|
||||
LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6|rev`/lib
|
||||
|
||||
# Now getting ready to execute Python
|
||||
# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
|
||||
PYTHON_EXEC=python3.8
|
||||
|
||||
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
|
||||
export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3
|
||||
export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3:$(pwd)
|
||||
|
||||
# Then let us set up the library path so that our compiled SO file can be loaded by Python
|
||||
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
|
||||
|
||||
# Now we are all let, and let's see if we can find a crash. Note we pass all params
|
||||
python3.8 ./crash_gen.py $@
|
||||
if [[ $1 == '--valgrind' ]]; then
|
||||
shift
|
||||
export PYTHONMALLOC=malloc
|
||||
VALGRIND_OUT=valgrind.out
|
||||
VALGRIND_ERR=valgrind.err
|
||||
# How to generate valgrind suppression file: https://stackoverflow.com/questions/17159578/generating-suppressions-for-memory-leaks
|
||||
# valgrind --leak-check=full --gen-suppressions=all --log-fd=9 python3.8 ./crash_gen.py $@ 9>>memcheck.log
|
||||
echo Executing under VALGRIND, with STDOUT/ERR going to $VALGRIND_OUT and $VALGRIND_ERR, please watch them from a different terminal.
|
||||
valgrind \
|
||||
--leak-check=yes \
|
||||
--suppressions=crash_gen/valgrind_taos.supp \
|
||||
$PYTHON_EXEC \
|
||||
./crash_gen/crash_gen.py $@ > $VALGRIND_OUT 2> $VALGRIND_ERR
|
||||
elif [[ $1 == '--helgrind' ]]; then
|
||||
shift
|
||||
valgrind \
|
||||
--tool=helgrind \
|
||||
$PYTHON_EXEC \
|
||||
./crash_gen/crash_gen.py $@
|
||||
else
|
||||
$PYTHON_EXEC ./crash_gen/crash_gen.py $@
|
||||
fi
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -272,6 +272,7 @@ cd ../../../debug; make
|
|||
./test.sh -f unique/db/replica_part.sim
|
||||
|
||||
./test.sh -f unique/dnode/alternativeRole.sim
|
||||
./test.sh -f unique/dnode/simple.sim
|
||||
./test.sh -f unique/dnode/balance1.sim
|
||||
./test.sh -f unique/dnode/balance2.sim
|
||||
./test.sh -f unique/dnode/balance3.sim
|
||||
|
|
|
@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG
|
|||
echo "dataDir $DATA_DIR" >> $TAOS_CFG
|
||||
echo "logDir $LOG_DIR" >> $TAOS_CFG
|
||||
echo "debugFlag 0" >> $TAOS_CFG
|
||||
echo "mDebugFlag 135" >> $TAOS_CFG
|
||||
echo "sdbDebugFlag 135" >> $TAOS_CFG
|
||||
echo "dDebugFlag 135" >> $TAOS_CFG
|
||||
echo "vDebugFlag 135" >> $TAOS_CFG
|
||||
echo "tsdbDebugFlag 135" >> $TAOS_CFG
|
||||
echo "cDebugFlag 135" >> $TAOS_CFG
|
||||
echo "jnidebugFlag 135" >> $TAOS_CFG
|
||||
echo "odbcdebugFlag 135" >> $TAOS_CFG
|
||||
echo "httpDebugFlag 135" >> $TAOS_CFG
|
||||
echo "monitorDebugFlag 135" >> $TAOS_CFG
|
||||
echo "mqttDebugFlag 135" >> $TAOS_CFG
|
||||
echo "qdebugFlag 135" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 135" >> $TAOS_CFG
|
||||
echo "mDebugFlag 143" >> $TAOS_CFG
|
||||
echo "sdbDebugFlag 143" >> $TAOS_CFG
|
||||
echo "dDebugFlag 143" >> $TAOS_CFG
|
||||
echo "vDebugFlag 143" >> $TAOS_CFG
|
||||
echo "tsdbDebugFlag 143" >> $TAOS_CFG
|
||||
echo "cDebugFlag 143" >> $TAOS_CFG
|
||||
echo "jnidebugFlag 143" >> $TAOS_CFG
|
||||
echo "odbcdebugFlag 143" >> $TAOS_CFG
|
||||
echo "httpDebugFlag 143" >> $TAOS_CFG
|
||||
echo "monitorDebugFlag 143" >> $TAOS_CFG
|
||||
echo "mqttDebugFlag 143" >> $TAOS_CFG
|
||||
echo "qdebugFlag 143" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 143" >> $TAOS_CFG
|
||||
echo "tmrDebugFlag 131" >> $TAOS_CFG
|
||||
echo "udebugFlag 135" >> $TAOS_CFG
|
||||
echo "sdebugFlag 135" >> $TAOS_CFG
|
||||
echo "wdebugFlag 135" >> $TAOS_CFG
|
||||
echo "cqdebugFlag 135" >> $TAOS_CFG
|
||||
echo "udebugFlag 143" >> $TAOS_CFG
|
||||
echo "sdebugFlag 143" >> $TAOS_CFG
|
||||
echo "wdebugFlag 143" >> $TAOS_CFG
|
||||
echo "cqdebugFlag 143" >> $TAOS_CFG
|
||||
echo "monitor 0" >> $TAOS_CFG
|
||||
echo "monitorInterval 1" >> $TAOS_CFG
|
||||
echo "http 0" >> $TAOS_CFG
|
||||
|
|
|
@ -109,15 +109,10 @@ echo "dataDir $DATA_DIR" >> $TAOS_CFG
|
|||
echo "logDir $LOG_DIR" >> $TAOS_CFG
|
||||
echo "scriptDir ${CODE_DIR}/../script" >> $TAOS_CFG
|
||||
echo "numOfLogLines 100000000" >> $TAOS_CFG
|
||||
echo "dDebugFlag 135" >> $TAOS_CFG
|
||||
echo "mDebugFlag 135" >> $TAOS_CFG
|
||||
echo "sdbDebugFlag 135" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 135" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 143" >> $TAOS_CFG
|
||||
echo "tmrDebugFlag 131" >> $TAOS_CFG
|
||||
echo "cDebugFlag 135" >> $TAOS_CFG
|
||||
echo "httpDebugFlag 135" >> $TAOS_CFG
|
||||
echo "monitorDebugFlag 135" >> $TAOS_CFG
|
||||
echo "udebugFlag 135" >> $TAOS_CFG
|
||||
echo "cDebugFlag 143" >> $TAOS_CFG
|
||||
echo "udebugFlag 143" >> $TAOS_CFG
|
||||
echo "tablemetakeeptimer 5" >> $TAOS_CFG
|
||||
echo "wal 0" >> $TAOS_CFG
|
||||
echo "asyncLog 0" >> $TAOS_CFG
|
||||
|
|
|
@ -27,7 +27,16 @@ system sh/exec.sh -n dnode2 -s start
|
|||
sql create dnode $hostname3
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
|
||||
sleep 5000
|
||||
sleep 3000
|
||||
|
||||
$x = 0
|
||||
show2:
|
||||
$x = $x + 1
|
||||
sleep 2000
|
||||
if $x == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show mnodes
|
||||
$dnode1Role = $data2_1
|
||||
$dnode2Role = $data2_2
|
||||
|
@ -37,6 +46,16 @@ print $dnode1Role
|
|||
print $dnode2Role
|
||||
print $dnode3Role
|
||||
|
||||
if $dnode1Role != master then
|
||||
goto show2
|
||||
endi
|
||||
if $dnode2Role != slave then
|
||||
goto show2
|
||||
endi
|
||||
if $dnode3Role != slave then
|
||||
goto show2
|
||||
endi
|
||||
|
||||
print ============================== step3
|
||||
$count = 2
|
||||
while $count < 102
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
system sh/stop_dnodes.sh
|
||||
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
|
||||
print ========== step1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
sql create dnode $hostname2
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
sql create dnode $hostname3
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
sleep 3000
|
||||
|
||||
sql create database d1 replica 2
|
||||
sql create table d1.t1 (t timestamp, i int)
|
||||
sql insert into d1.t1 values(now+1s, 15)
|
||||
sql insert into d1.t1 values(now+2s, 14)
|
||||
sql insert into d1.t1 values(now+3s, 13)
|
||||
sql insert into d1.t1 values(now+4s, 12)
|
||||
sql insert into d1.t1 values(now+5s, 11)
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 openVnodes $data2_1
|
||||
print dnode2 openVnodes $data2_2
|
||||
print dnode3 openVnodes $data2_3
|
||||
print dnode4 openVnodes $data2_4
|
||||
if $data2_1 != 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data2_2 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data2_3 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data2_4 != null then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ========== step2
|
||||
sql create dnode $hostname4
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
sleep 3000
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 openVnodes $data2_1
|
||||
print dnode2 openVnodes $data2_2
|
||||
print dnode3 openVnodes $data2_3
|
||||
print dnode4 openVnodes $data2_4
|
||||
if $data2_1 != 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data2_2 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data2_3 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data2_4 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ========== step3
|
||||
sql drop dnode $hostname2
|
||||
|
||||
$x = 0
|
||||
show3:
|
||||
$x = $x + 1
|
||||
sleep 2000
|
||||
if $x == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 openVnodes $data2_1
|
||||
print dnode2 openVnodes $data2_2
|
||||
print dnode3 openVnodes $data2_3
|
||||
print dnode4 openVnodes $data2_4
|
||||
if $data2_1 != 0 then
|
||||
goto show3
|
||||
endi
|
||||
if $data2_2 != null then
|
||||
goto show3
|
||||
endi
|
||||
if $data2_3 != 1 then
|
||||
goto show3
|
||||
endi
|
||||
if $data2_4 != 1 then
|
||||
goto show3
|
||||
endi
|
||||
|
||||
print ========== step4
|
||||
sql drop dnode $hostname3
|
||||
|
||||
$x = 0
|
||||
show4:
|
||||
$x = $x + 1
|
||||
sleep 2000
|
||||
if $x == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show dnodes
|
||||
print dnode1 openVnodes $data2_1
|
||||
print dnode2 openVnodes $data2_2
|
||||
print dnode3 openVnodes $data2_3
|
||||
print dnode4 openVnodes $data2_4
|
||||
if $data2_1 != 1 then
|
||||
goto show4
|
||||
endi
|
||||
if $data2_2 != null then
|
||||
goto show4
|
||||
endi
|
||||
if $data2_3 != null then
|
||||
goto show4
|
||||
endi
|
||||
if $data2_4 != 1 then
|
||||
goto show4
|
||||
endi
|
||||
|
||||
print ========== step5
|
||||
sql select * from d1.t1 order by t desc
|
||||
print $data01 $data11 $data21 $data31 $data41
|
||||
if $data01 != 11 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 12 then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != 13 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != 14 then
|
||||
return -1
|
||||
endi
|
||||
if $data41 != 15 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
|
@ -26,11 +26,11 @@ $x = 0
|
|||
show2:
|
||||
$x = $x + 1
|
||||
sleep 2000
|
||||
if $x == 10 then
|
||||
if $x == 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show mnodes
|
||||
sql show mnodes -x show2
|
||||
print dnode1 ==> $data2_1
|
||||
print dnode2 ==> $data2_2
|
||||
if $data2_1 != master then
|
||||
|
|
Loading…
Reference in New Issue