Merge from develop

This commit is contained in:
Shengliang Guan 2020-12-03 10:10:18 +00:00
commit 01c9bde5ce
42 changed files with 1226 additions and 743 deletions

42
Jenkinsfile vendored
View File

@ -175,7 +175,47 @@ pipeline {
}
}
stage('arm64_build'){
agent{label 'arm64'}
steps{
sh '''
cd ${WK}
git fetch
git checkout develop
git pull
cd ${WKC}
git fetch
git checkout develop
git pull
git submodule update
cd ${WKC}/packaging
./release.sh -v cluster -c aarch64 -n 2.0.0.0 -m 2.0.0.0
'''
}
}
stage('arm32_build'){
agent{label 'arm32'}
steps{
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WK}
git fetch
git checkout develop
git pull
cd ${WKC}
git fetch
git checkout develop
git pull
git submodule update
cd ${WKC}/packaging
./release.sh -v cluster -c aarch32 -n 2.0.0.0 -m 2.0.0.0
'''
}
}
}
}
}

View File

@ -142,7 +142,7 @@ C/C++的API类似于MySQL的C API。应用程序使用时需要包含TDengine
获取最近一次API调用失败的原因返回值为错误代码。
**注意**:对于单个数据库连接在同一时刻只能有一个线程使用该连接调用API否则会有未定义的行为出现并可能导致客户端crash。客户端应用可以通过建立多个连接进行多线程的数据写入或查询处理
**注意**:对于每个数据库应用2.0及以上版本 TDengine 推荐只建立一个连接。同时在应用中将该连接 (TAOS*) 结构体传递到不同的线程共享使用。基于 TAOS 结构体发出的查询、写入等操作具有多线程安全性。C 语言的连接器可以按照需求动态建立面向数据库的新连接(该过程对用户不可见),同时建议只有在程序最后退出的时候才调用 taos_close 关闭连接
### 异步查询API

View File

@ -236,7 +236,7 @@
# httpDebugFlag 131
# debug flag for monitor
# monitorDebugFlag 131
# monDebugFlag 131
# debug flag for query
# qDebugflag 131

57
src/balance/inc/bnInt.h Normal file
View File

@ -0,0 +1,57 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_BALANCE_INT_H
#define TDENGINE_BALANCE_INT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "mnodeInt.h"
#include "mnodeDef.h"
#include "mnodeDnode.h"
typedef struct {
int32_t size;
int32_t maxSize;
SDnodeObj **list;
} SBnDnodes;
typedef struct {
void * timer;
bool stop;
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t thread;
} SBnThread;
typedef struct {
pthread_mutex_t mutex;
} SBnMgmt;
int32_t bnInit();
void bnCleanUp();
bool bnStart();
void bnCheckStatus();
void bnCheckModules();
extern SBnDnodes tsBnDnodes;
extern void *tsMnodeTmr;
#ifdef __cplusplus
}
#endif
#endif

34
src/balance/inc/bnScore.h Normal file
View File

@ -0,0 +1,34 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_BALANCE_SCORE_H
#define TDENGINE_BALANCE_SCORE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "bnInt.h"
void bnInitDnodes();
void bnCleanupDnodes();
void bnAccquireDnodes();
void bnReleaseDnodes();
float bnTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extraVnode);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,33 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_BALANCE_THREAD_H
#define TDENGINE_BALANCE_THREAD_H
#ifdef __cplusplus
extern "C" {
#endif
#include "bnInt.h"
int32_t bnInitThread();
void bnCleanupThread();
void bnNotify();
void bnStartTimer(int64_t mseconds);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -15,17 +15,12 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tutil.h"
#include "tbalance.h"
#include "tsync.h"
#include "ttimer.h"
#include "tglobal.h"
#include "tdataformat.h"
#include "dnode.h"
#include "mnode.h"
#include "mnodeDef.h"
#include "mnodeInt.h"
#include "mnodeDnode.h"
#include "bnInt.h"
#include "bnScore.h"
#include "bnThread.h"
#include "mnodeDb.h"
#include "mnodeMnode.h"
#include "mnodeSdb.h"
@ -33,36 +28,18 @@
#include "mnodeUser.h"
#include "mnodeVgroup.h"
/*
* once sdb work as mater, then tsAccessSquence reset to zero
* increase tsAccessSquence every balance interval
*/
extern void * tsMnodeTmr;
static void * tsBalanceTimer = NULL;
static int32_t tsBalanceDnodeListSize = 0;
static SDnodeObj ** tsBalanceDnodeList = NULL;
static int32_t tsBalanceDnodeListMallocSize = 16;
static pthread_mutex_t tsBalanceMutex;
static SBnMgmt tsBnMgmt;;
static void bnMonitorDnodeModule();
static void balanceStartTimer(int64_t mseconds);
static void balanceInitDnodeList();
static void balanceCleanupDnodeList();
static void balanceAccquireDnodeList();
static void balanceReleaseDnodeList();
static void balanceMonitorDnodeModule();
static float balanceTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extraVnode);
static int32_t balanceGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void balanceLock() {
pthread_mutex_lock(&tsBalanceMutex);
static void bnLock() {
pthread_mutex_lock(&tsBnMgmt.mutex);
}
static void balanceUnLock() {
pthread_mutex_unlock(&tsBalanceMutex);
static void bnUnLock() {
pthread_mutex_unlock(&tsBnMgmt.mutex);
}
static bool balanceCheckFree(SDnodeObj *pDnode) {
static bool bnCheckFree(SDnodeObj *pDnode) {
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) {
mError("dnode:%d, status:%s not available", pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status));
return false;
@ -86,7 +63,7 @@ static bool balanceCheckFree(SDnodeObj *pDnode) {
return true;
}
static void balanceDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) {
static void bnDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) {
mDebug("vgId:%d, dnode:%d is dropping", pVgroup->vgId, pVnodeGid->dnodeId);
SDnodeObj *pDnode = mnodeGetDnode(pVnodeGid->dnodeId);
@ -111,27 +88,26 @@ static void balanceDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) {
mnodeUpdateVgroup(pVgroup);
}
static void balanceSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) {
static void bnSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) {
// SVnodeGid tmp = *pVnodeGid1;
// *pVnodeGid1 = *pVnodeGid2;
// *pVnodeGid2 = tmp;
}
int32_t balanceAllocVnodes(SVgObj *pVgroup) {
int32_t bnAllocVnodes(SVgObj *pVgroup) {
static int32_t randIndex = 0;
int32_t dnode = 0;
int32_t vnodes = 0;
balanceLock();
balanceAccquireDnodeList();
bnLock();
bnAccquireDnodes();
mDebug("db:%s, try alloc %d vnodes to vgroup, dnodes total:%d, avail:%d", pVgroup->dbName, pVgroup->numOfVnodes,
mnodeGetDnodesNum(), tsBalanceDnodeListSize);
mnodeGetDnodesNum(), tsBnDnodes.size);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
for (; dnode < tsBalanceDnodeListSize; ++dnode) {
SDnodeObj *pDnode = tsBalanceDnodeList[dnode];
if (balanceCheckFree(pDnode)) {
for (; dnode < tsBnDnodes.size; ++dnode) {
SDnodeObj *pDnode = tsBnDnodes.list[dnode];
if (bnCheckFree(pDnode)) {
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i;
pVnodeGid->dnodeId = pDnode->dnodeId;
pVnodeGid->pDnode = pDnode;
@ -148,8 +124,8 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
}
if (vnodes != pVgroup->numOfVnodes) {
balanceReleaseDnodeList();
balanceUnLock();
bnReleaseDnodes();
bnUnLock();
mDebug("db:%s, need vnodes:%d, but alloc:%d", pVgroup->dbName, pVgroup->numOfVnodes, vnodes);
@ -179,33 +155,33 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
if (pVgroup->numOfVnodes == 1) {
} else if (pVgroup->numOfVnodes == 2) {
if (randIndex++ % 2 == 0) {
balanceSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1);
bnSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1);
}
} else {
int32_t randVal = randIndex++ % 6;
if (randVal == 1) { // 1, 0, 2
balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
} else if (randVal == 2) { // 1, 2, 0
balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
} else if (randVal == 3) { // 2, 1, 0
balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
} else if (randVal == 4) { // 2, 0, 1
balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
}
if (randVal == 5) { // 0, 2, 1
balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
} else {
} // 0, 1, 2
}
balanceReleaseDnodeList();
balanceUnLock();
bnReleaseDnodes();
bnUnLock();
return TSDB_CODE_SUCCESS;
}
static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
if (pVgroup->lbTime + 5 * tsStatusInterval > tsAccessSquence) {
return false;
}
@ -232,7 +208,7 @@ static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
* desc: remove one vnode from vgroup
* all vnodes in vgroup should in ready state, except the balancing one
**/
static int32_t balanceRemoveVnode(SVgObj *pVgroup) {
static int32_t bnRemoveVnode(SVgObj *pVgroup) {
if (pVgroup->numOfVnodes <= 1) return -1;
SVnodeGid *pRmVnode = NULL;
@ -274,17 +250,17 @@ static int32_t balanceRemoveVnode(SVgObj *pVgroup) {
pSelVnode = pRmVnode;
}
if (!balanceCheckVgroupReady(pVgroup, pSelVnode)) {
if (!bnCheckVgroupReady(pVgroup, pSelVnode)) {
mDebug("vgId:%d, is not ready", pVgroup->vgId);
return -1;
} else {
mDebug("vgId:%d, is ready, discard dnode:%d", pVgroup->vgId, pSelVnode->dnodeId);
balanceDiscardVnode(pVgroup, pSelVnode);
bnDiscardVnode(pVgroup, pSelVnode);
return TSDB_CODE_SUCCESS;
}
}
static bool balanceCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) {
static bool bnCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pGid = &pVgroup->vnodeGid[i];
if (pGid->dnodeId == 0) break;
@ -299,13 +275,13 @@ static bool balanceCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) {
/**
* desc: add vnode to vgroup, find a new one if dest dnode is null
**/
static int32_t balanceAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) {
static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) {
if (pDestDnode == NULL) {
for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) {
SDnodeObj *pDnode = tsBalanceDnodeList[i];
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
SDnodeObj *pDnode = tsBnDnodes.list[i];
if (pDnode == pSrcDnode) continue;
if (balanceCheckDnodeInVgroup(pDnode, pVgroup)) continue;
if (!balanceCheckFree(pDnode)) continue;
if (bnCheckDnodeInVgroup(pDnode, pVgroup)) continue;
if (!bnCheckFree(pDnode)) continue;
pDestDnode = pDnode;
mDebug("vgId:%d, add vnode to dnode:%d", pVgroup->vgId, pDnode->dnodeId);
@ -333,25 +309,25 @@ static int32_t balanceAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj
return TSDB_CODE_SUCCESS;
}
static bool balanceMonitorBalance() {
if (tsBalanceDnodeListSize < 2) return false;
static bool bnMonitorBalance() {
if (tsBnDnodes.size < 2) return false;
for (int32_t src = tsBalanceDnodeListSize - 1; src >= 0; --src) {
SDnodeObj *pDnode = tsBalanceDnodeList[src];
mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBalanceDnodeListSize - src - 1,
for (int32_t src = tsBnDnodes.size - 1; src >= 0; --src) {
SDnodeObj *pDnode = tsBnDnodes.list[src];
mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBnDnodes.size - src - 1,
pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status), pDnode->score, pDnode->numOfCores,
pDnode->openVnodes);
}
float scoresDiff = tsBalanceDnodeList[tsBalanceDnodeListSize - 1]->score - tsBalanceDnodeList[0]->score;
float scoresDiff = tsBnDnodes.list[tsBnDnodes.size - 1]->score - tsBnDnodes.list[0]->score;
if (scoresDiff < 0.01) {
mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBalanceDnodeListSize, scoresDiff);
mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBnDnodes.size, scoresDiff);
return false;
}
for (int32_t src = tsBalanceDnodeListSize - 1; src > 0; --src) {
SDnodeObj *pSrcDnode = tsBalanceDnodeList[src];
float srcScore = balanceTryCalcDnodeScore(pSrcDnode, -1);
for (int32_t src = tsBnDnodes.size - 1; src > 0; --src) {
SDnodeObj *pSrcDnode = tsBnDnodes.list[src];
float srcScore = bnTryCalcDnodeScore(pSrcDnode, -1);
if (tsEnableBalance == 0 && pSrcDnode->status != TAOS_DN_STATUS_DROPPING) {
continue;
}
@ -362,19 +338,19 @@ static bool balanceMonitorBalance() {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (balanceCheckDnodeInVgroup(pSrcDnode, pVgroup)) {
if (bnCheckDnodeInVgroup(pSrcDnode, pVgroup)) {
for (int32_t dest = 0; dest < src; dest++) {
SDnodeObj *pDestDnode = tsBalanceDnodeList[dest];
if (balanceCheckDnodeInVgroup(pDestDnode, pVgroup)) continue;
SDnodeObj *pDestDnode = tsBnDnodes.list[dest];
if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) continue;
float destScore = balanceTryCalcDnodeScore(pDestDnode, 1);
float destScore = bnTryCalcDnodeScore(pDestDnode, 1);
if (srcScore + 0.0001 < destScore) continue;
if (!balanceCheckFree(pDestDnode)) continue;
if (!bnCheckFree(pDestDnode)) continue;
mDebug("vgId:%d, balance from dnode:%d to dnode:%d, srcScore:%.1f:%.1f, destScore:%.1f:%.1f",
pVgroup->vgId, pSrcDnode->dnodeId, pDestDnode->dnodeId, pSrcDnode->score,
srcScore, pDestDnode->score, destScore);
balanceAddVnode(pVgroup, pSrcDnode, pDestDnode);
bnAddVnode(pVgroup, pSrcDnode, pDestDnode);
mnodeDecVgroupRef(pVgroup);
mnodeCancelGetNextVgroup(pIter);
return true;
@ -392,7 +368,7 @@ static bool balanceMonitorBalance() {
// 1. reset balanceAccessSquence to zero
// 2. reset state of dnodes to offline
// 3. reset lastAccess of dnodes to zero
void balanceReset() {
void bnReset() {
void * pIter = NULL;
SDnodeObj *pDnode = NULL;
while (1) {
@ -413,7 +389,7 @@ void balanceReset() {
tsAccessSquence = 0;
}
static int32_t balanceMonitorVgroups() {
static int32_t bnMonitorVgroups() {
void * pIter = NULL;
SVgObj *pVgroup = NULL;
bool hasUpdatingVgroup = false;
@ -429,11 +405,11 @@ static int32_t balanceMonitorVgroups() {
if (vgReplica > dbReplica) {
mInfo("vgId:%d, replica:%d numOfVnodes:%d, try remove one vnode", pVgroup->vgId, dbReplica, vgReplica);
hasUpdatingVgroup = true;
code = balanceRemoveVnode(pVgroup);
code = bnRemoveVnode(pVgroup);
} else if (vgReplica < dbReplica) {
mInfo("vgId:%d, replica:%d numOfVnodes:%d, try add one vnode", pVgroup->vgId, dbReplica, vgReplica);
hasUpdatingVgroup = true;
code = balanceAddVnode(pVgroup, NULL, NULL);
code = bnAddVnode(pVgroup, NULL, NULL);
}
mnodeDecVgroupRef(pVgroup);
@ -446,7 +422,7 @@ static int32_t balanceMonitorVgroups() {
return hasUpdatingVgroup;
}
static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) {
static bool bnMonitorDnodeDropping(SDnodeObj *pDnode) {
mDebug("dnode:%d, in dropping state", pDnode->dnodeId);
void * pIter = NULL;
@ -456,7 +432,7 @@ static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
hasThisDnode = balanceCheckDnodeInVgroup(pDnode, pVgroup);
hasThisDnode = bnCheckDnodeInVgroup(pDnode, pVgroup);
mnodeDecVgroupRef(pVgroup);
if (hasThisDnode) {
@ -474,7 +450,7 @@ static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) {
return false;
}
static bool balanceMontiorDropping() {
static bool bnMontiorDropping() {
void *pIter = NULL;
SDnodeObj *pDnode = NULL;
@ -499,7 +475,7 @@ static bool balanceMontiorDropping() {
}
if (pDnode->status == TAOS_DN_STATUS_DROPPING) {
bool ret = balanceMonitorDnodeDropping(pDnode);
bool ret = bnMonitorDnodeDropping(pDnode);
mnodeDecDnodeRef(pDnode);
mnodeCancelGetNextDnode(pIter);
return ret;
@ -509,33 +485,31 @@ static bool balanceMontiorDropping() {
return false;
}
static bool balanceStart() {
bool bnStart() {
if (!sdbIsMaster()) return false;
balanceLock();
bnLock();
bnAccquireDnodes();
balanceAccquireDnodeList();
bnMonitorDnodeModule();
balanceMonitorDnodeModule();
bool updateSoon = balanceMontiorDropping();
bool updateSoon = bnMontiorDropping();
if (!updateSoon) {
updateSoon = balanceMonitorVgroups();
updateSoon = bnMonitorVgroups();
}
if (!updateSoon) {
updateSoon = balanceMonitorBalance();
updateSoon = bnMonitorBalance();
}
balanceReleaseDnodeList();
balanceUnLock();
bnReleaseDnodes();
bnUnLock();
return updateSoon;
}
static void balanceSetVgroupOffline(SDnodeObj* pDnode) {
static void bnSetVgroupOffline(SDnodeObj* pDnode) {
void *pIter = NULL;
while (1) {
SVgObj *pVgroup;
@ -551,7 +525,7 @@ static void balanceSetVgroupOffline(SDnodeObj* pDnode) {
}
}
static void balanceCheckDnodeAccess() {
void bnCheckStatus() {
void * pIter = NULL;
SDnodeObj *pDnode = NULL;
@ -564,84 +538,39 @@ static void balanceCheckDnodeAccess() {
pDnode->offlineReason = TAOS_DN_OFF_STATUS_MSG_TIMEOUT;
mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence,
pDnode->lastAccess, pDnode->status);
balanceSetVgroupOffline(pDnode);
bnSetVgroupOffline(pDnode);
}
}
mnodeDecDnodeRef(pDnode);
}
}
static void balanceProcessBalanceTimer(void *handle, void *tmrId) {
if (!sdbIsMaster()) return;
tsBalanceTimer = NULL;
tsAccessSquence ++;
balanceCheckDnodeAccess();
bool updateSoon = false;
if (handle == NULL) {
if (tsAccessSquence % tsBalanceInterval == 0) {
mDebug("balance function is scheduled by timer");
updateSoon = balanceStart();
}
} else {
int64_t mseconds = (int64_t)handle;
mDebug("balance function is scheduled by event for %" PRId64 " mseconds arrived", mseconds);
updateSoon = balanceStart();
}
if (updateSoon) {
balanceStartTimer(1000);
} else {
taosTmrReset(balanceProcessBalanceTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBalanceTimer);
}
}
static void balanceStartTimer(int64_t mseconds) {
taosTmrReset(balanceProcessBalanceTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBalanceTimer);
}
void balanceSyncNotify() {
void bnCheckModules() {
if (sdbIsMaster()) {
balanceLock();
balanceAccquireDnodeList();
balanceMonitorDnodeModule();
balanceReleaseDnodeList();
balanceUnLock();
bnLock();
bnAccquireDnodes();
bnMonitorDnodeModule();
bnReleaseDnodes();
bnUnLock();
}
}
void balanceAsyncNotify() {
balanceStartTimer(500);
}
int32_t balanceInit() {
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_SCORES, balanceGetScoresMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_SCORES, balanceRetrieveScores);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_SCORES, mnodeCancelGetNextDnode);
pthread_mutex_init(&tsBalanceMutex, NULL);
balanceInitDnodeList();
balanceStartTimer(2000);
mDebug("balance start fp:%p initialized", balanceProcessBalanceTimer);
balanceReset();
int32_t bnInit() {
pthread_mutex_init(&tsBnMgmt.mutex, NULL);
bnInitDnodes();
bnInitThread();
bnReset();
return 0;
}
void balanceCleanUp() {
if (tsBalanceTimer != NULL) {
taosTmrStopA(&tsBalanceTimer);
pthread_mutex_destroy(&tsBalanceMutex);
tsBalanceTimer = NULL;
mDebug("stop balance timer");
}
balanceCleanupDnodeList();
void bnCleanUp() {
bnCleanupThread();
bnCleanupDnodes();
pthread_mutex_destroy(&tsBnMgmt.mutex);
}
int32_t balanceDropDnode(SDnodeObj *pDnode) {
int32_t bnDropDnode(SDnodeObj *pDnode) {
int32_t totalFreeVnodes = 0;
void * pIter = NULL;
SDnodeObj *pTempDnode = NULL;
@ -650,7 +579,7 @@ int32_t balanceDropDnode(SDnodeObj *pDnode) {
pIter = mnodeGetNextDnode(pIter, &pTempDnode);
if (pTempDnode == NULL) break;
if (pTempDnode != pDnode && balanceCheckFree(pTempDnode)) {
if (pTempDnode != pDnode && bnCheckFree(pTempDnode)) {
totalFreeVnodes += (TSDB_MAX_VNODES - pTempDnode->openVnodes);
}
@ -665,298 +594,17 @@ int32_t balanceDropDnode(SDnodeObj *pDnode) {
pDnode->status = TAOS_DN_STATUS_DROPPING;
mnodeUpdateDnode(pDnode);
balanceStartTimer(1100);
bnStartTimer(1100);
return TSDB_CODE_SUCCESS;
}
static int32_t balanceCalcCpuScore(SDnodeObj *pDnode) {
if (pDnode->cpuAvgUsage < 80)
return 0;
else if (pDnode->cpuAvgUsage < 90)
return 10;
else
return 50;
}
static int32_t balanceCalcMemoryScore(SDnodeObj *pDnode) {
if (pDnode->memoryAvgUsage < 80)
return 0;
else if (pDnode->memoryAvgUsage < 90)
return 10;
else
return 50;
}
static int32_t balanceCalcDiskScore(SDnodeObj *pDnode) {
if (pDnode->diskAvgUsage < 80)
return 0;
else if (pDnode->diskAvgUsage < 90)
return 10;
else
return 50;
}
static int32_t balanceCalcBandwidthScore(SDnodeObj *pDnode) {
if (pDnode->bandwidthUsage < 30)
return 0;
else if (pDnode->bandwidthUsage < 80)
return 10;
else
return 50;
}
static float balanceCalcModuleScore(SDnodeObj *pDnode) {
if (pDnode->numOfCores <= 0) return 0;
if (pDnode->isMgmt) {
return (float)tsMnodeEqualVnodeNum / pDnode->numOfCores;
}
return 0;
}
static float balanceCalcVnodeScore(SDnodeObj *pDnode, int32_t extra) {
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) return 100000000;
if (pDnode->numOfCores <= 0) return 0;
return (float)(pDnode->openVnodes + extra) / pDnode->numOfCores;
}
/**
* calc singe score, such as cpu/memory/disk/bandwitdh/vnode
* 1. get the score config
* 2. if the value is out of range, use border data
* 3. otherwise use interpolation method
**/
void balanceCalcDnodeScore(SDnodeObj *pDnode) {
pDnode->score = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) +
balanceCalcBandwidthScore(pDnode) + balanceCalcModuleScore(pDnode) +
balanceCalcVnodeScore(pDnode, 0) + pDnode->customScore;
}
float balanceTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extra) {
int32_t systemScore = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) +
balanceCalcBandwidthScore(pDnode);
float moduleScore = balanceCalcModuleScore(pDnode);
float vnodeScore = balanceCalcVnodeScore(pDnode, extra);
float score = systemScore + moduleScore + vnodeScore + pDnode->customScore;
return score;
}
static void balanceInitDnodeList() {
tsBalanceDnodeList = calloc(tsBalanceDnodeListMallocSize, sizeof(SDnodeObj *));
}
static void balanceCleanupDnodeList() {
if (tsBalanceDnodeList != NULL) {
free(tsBalanceDnodeList);
tsBalanceDnodeList = NULL;
}
}
static void balanceCheckDnodeListSize(int32_t dnodesNum) {
if (tsBalanceDnodeListMallocSize <= dnodesNum) {
tsBalanceDnodeListMallocSize = dnodesNum * 2;
tsBalanceDnodeList = realloc(tsBalanceDnodeList, tsBalanceDnodeListMallocSize * sizeof(SDnodeObj *));
}
}
void balanceAccquireDnodeList() {
int32_t dnodesNum = mnodeGetDnodesNum();
balanceCheckDnodeListSize(dnodesNum);
void * pIter = NULL;
SDnodeObj *pDnode = NULL;
int32_t dnodeIndex = 0;
while (1) {
if (dnodeIndex >= dnodesNum) {
mnodeCancelGetNextDnode(pIter);
break;
}
pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
mnodeDecDnodeRef(pDnode);
continue;
}
balanceCalcDnodeScore(pDnode);
int32_t orderIndex = dnodeIndex;
for (; orderIndex > 0; --orderIndex) {
if (pDnode->score > tsBalanceDnodeList[orderIndex - 1]->score) {
break;
}
tsBalanceDnodeList[orderIndex] = tsBalanceDnodeList[orderIndex - 1];
}
tsBalanceDnodeList[orderIndex] = pDnode;
dnodeIndex++;
}
tsBalanceDnodeListSize = dnodeIndex;
}
void balanceReleaseDnodeList() {
for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) {
SDnodeObj *pDnode = tsBalanceDnodeList[i];
if (pDnode != NULL) {
mnodeDecDnodeRef(pDnode);
}
}
}
static int32_t balanceGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SUserObj *pUser = mnodeGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->pAcct->user, "root") != 0) {
mnodeDecUserRef(pUser);
return TSDB_CODE_MND_NO_RIGHTS;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "system scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "custom scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "module scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "vnode scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "total scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "open vnodes");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "cpu cores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 18 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "balance state");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = mnodeGetDnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pIter = NULL;
mnodeDecUserRef(pUser);
return 0;
}
static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
int32_t cols = 0;
while (numOfRows < rows) {
pShow->pIter = mnodeGetNextDnode(pShow->pIter, &pDnode);
if (pDnode == NULL) break;
int32_t systemScore = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) +
balanceCalcBandwidthScore(pDnode);
float moduleScore = balanceCalcModuleScore(pDnode);
float vnodeScore = balanceCalcVnodeScore(pDnode, 0);
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDnode->dnodeId;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = systemScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = pDnode->customScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (int32_t)moduleScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (int32_t)vnodeScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (int32_t)(vnodeScore + moduleScore + pDnode->customScore + systemScore);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDnode->openVnodes;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDnode->numOfCores;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status));
cols++;
numOfRows++;
mnodeDecDnodeRef(pDnode);
}
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows;
return numOfRows;
}
static void balanceMonitorDnodeModule() {
static void bnMonitorDnodeModule() {
int32_t numOfMnodes = mnodeGetMnodesNum();
if (numOfMnodes >= tsNumOfMnodes) return;
for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) {
SDnodeObj *pDnode = tsBalanceDnodeList[i];
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
SDnodeObj *pDnode = tsBnDnodes.list[i];
if (pDnode == NULL) break;
if (pDnode->isMgmt || pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) {
@ -980,7 +628,7 @@ static void balanceMonitorDnodeModule() {
}
}
int32_t balanceAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t dnodeId) {
int32_t bnAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t dnodeId) {
if (!sdbIsMaster()) {
mError("dnode:%d, failed to alter vgId:%d to dnode:%d, for self not master", pSrcDnode->dnodeId, vnodeId, dnodeId);
return TSDB_CODE_MND_DNODE_NOT_EXIST;
@ -1004,29 +652,29 @@ int32_t balanceAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t
return TSDB_CODE_MND_DNODE_NOT_EXIST;
}
balanceLock();
balanceAccquireDnodeList();
bnLock();
bnAccquireDnodes();
int32_t code = TSDB_CODE_SUCCESS;
if (!balanceCheckDnodeInVgroup(pSrcDnode, pVgroup)) {
if (!bnCheckDnodeInVgroup(pSrcDnode, pVgroup)) {
mError("dnode:%d, failed to alter vgId:%d to dnode:%d, vgroup not in dnode:%d", pSrcDnode->dnodeId, vnodeId,
dnodeId, pSrcDnode->dnodeId);
code = TSDB_CODE_MND_VGROUP_NOT_IN_DNODE;
} else if (balanceCheckDnodeInVgroup(pDestDnode, pVgroup)) {
} else if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) {
mError("dnode:%d, failed to alter vgId:%d to dnode:%d, vgroup already in dnode:%d", pSrcDnode->dnodeId, vnodeId,
dnodeId, dnodeId);
code = TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE;
} else if (!balanceCheckFree(pDestDnode)) {
} else if (!bnCheckFree(pDestDnode)) {
mError("dnode:%d, failed to alter vgId:%d to dnode:%d, for dnode:%d not free", pSrcDnode->dnodeId, vnodeId, dnodeId,
dnodeId);
code = TSDB_CODE_MND_DNODE_NOT_FREE;
} else {
code = balanceAddVnode(pVgroup, pSrcDnode, pDestDnode);
code = bnAddVnode(pVgroup, pSrcDnode, pDestDnode);
mInfo("dnode:%d, alter vgId:%d to dnode:%d, result:%s", pSrcDnode->dnodeId, vnodeId, dnodeId, tstrerror(code));
}
balanceReleaseDnodeList();
balanceUnLock();
bnReleaseDnodes();
bnUnLock();
mnodeDecVgroupRef(pVgroup);
mnodeDecDnodeRef(pDestDnode);

312
src/balance/src/bnScore.c Normal file
View File

@ -0,0 +1,312 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "mnodeShow.h"
#include "mnodeUser.h"
#include "bnScore.h"
SBnDnodes tsBnDnodes;
static int32_t bnGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t bnCalcCpuScore(SDnodeObj *pDnode) {
if (pDnode->cpuAvgUsage < 80)
return 0;
else if (pDnode->cpuAvgUsage < 90)
return 10;
else
return 50;
}
static int32_t bnCalcMemoryScore(SDnodeObj *pDnode) {
if (pDnode->memoryAvgUsage < 80)
return 0;
else if (pDnode->memoryAvgUsage < 90)
return 10;
else
return 50;
}
static int32_t bnCalcDiskScore(SDnodeObj *pDnode) {
if (pDnode->diskAvgUsage < 80)
return 0;
else if (pDnode->diskAvgUsage < 90)
return 10;
else
return 50;
}
static int32_t bnCalcBandScore(SDnodeObj *pDnode) {
if (pDnode->bandwidthUsage < 30)
return 0;
else if (pDnode->bandwidthUsage < 80)
return 10;
else
return 50;
}
static float bnCalcModuleScore(SDnodeObj *pDnode) {
if (pDnode->numOfCores <= 0) return 0;
if (pDnode->isMgmt) {
return (float)tsMnodeEqualVnodeNum / pDnode->numOfCores;
}
return 0;
}
static float bnCalcVnodeScore(SDnodeObj *pDnode, int32_t extra) {
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) return 100000000;
if (pDnode->numOfCores <= 0) return 0;
return (float)(pDnode->openVnodes + extra) / pDnode->numOfCores;
}
/**
* calc singe score, such as cpu/memory/disk/bandwitdh/vnode
* 1. get the score config
* 2. if the value is out of range, use border data
* 3. otherwise use interpolation method
**/
static void bnCalcDnodeScore(SDnodeObj *pDnode) {
pDnode->score = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) +
bnCalcBandScore(pDnode) + bnCalcModuleScore(pDnode) + bnCalcVnodeScore(pDnode, 0) +
pDnode->customScore;
}
float bnTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extra) {
int32_t systemScore = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) +
bnCalcBandScore(pDnode);
float moduleScore = bnCalcModuleScore(pDnode);
float vnodeScore = bnCalcVnodeScore(pDnode, extra);
float score = systemScore + moduleScore + vnodeScore + pDnode->customScore;
return score;
}
void bnInitDnodes() {
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_SCORES, bnGetScoresMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_SCORES, bnRetrieveScores);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_SCORES, mnodeCancelGetNextDnode);
memset(&tsBnDnodes, 0, sizeof(SBnDnodes));
tsBnDnodes.maxSize = 16;
tsBnDnodes.list = calloc(tsBnDnodes.maxSize, sizeof(SDnodeObj *));
}
void bnCleanupDnodes() {
if (tsBnDnodes.list != NULL) {
free(tsBnDnodes.list);
tsBnDnodes.list = NULL;
}
}
static void bnCheckDnodesSize(int32_t dnodesNum) {
if (tsBnDnodes.maxSize <= dnodesNum) {
tsBnDnodes.maxSize = dnodesNum * 2;
tsBnDnodes.list = realloc(tsBnDnodes.list, tsBnDnodes.maxSize * sizeof(SDnodeObj *));
}
}
void bnAccquireDnodes() {
int32_t dnodesNum = mnodeGetDnodesNum();
bnCheckDnodesSize(dnodesNum);
void * pIter = NULL;
SDnodeObj *pDnode = NULL;
int32_t dnodeIndex = 0;
while (1) {
if (dnodeIndex >= dnodesNum) {
mnodeCancelGetNextDnode(pIter);
break;
}
pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
mnodeDecDnodeRef(pDnode);
continue;
}
bnCalcDnodeScore(pDnode);
int32_t orderIndex = dnodeIndex;
for (; orderIndex > 0; --orderIndex) {
if (pDnode->score > tsBnDnodes.list[orderIndex - 1]->score) {
break;
}
tsBnDnodes.list[orderIndex] = tsBnDnodes.list[orderIndex - 1];
}
tsBnDnodes.list[orderIndex] = pDnode;
dnodeIndex++;
}
tsBnDnodes.size = dnodeIndex;
}
void bnReleaseDnodes() {
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
SDnodeObj *pDnode = tsBnDnodes.list[i];
if (pDnode != NULL) {
mnodeDecDnodeRef(pDnode);
}
}
}
static int32_t bnGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SUserObj *pUser = mnodeGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->pAcct->user, "root") != 0) {
mnodeDecUserRef(pUser);
return TSDB_CODE_MND_NO_RIGHTS;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "system scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "custom scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "module scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "vnode scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
strcpy(pSchema[cols].name, "total scores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "open vnodes");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "cpu cores");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 18 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "balance state");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = mnodeGetDnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pIter = NULL;
mnodeDecUserRef(pUser);
return 0;
}
static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
int32_t cols = 0;
while (numOfRows < rows) {
pShow->pIter = mnodeGetNextDnode(pShow->pIter, &pDnode);
if (pDnode == NULL) break;
int32_t systemScore = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) + bnCalcBandScore(pDnode);
float moduleScore = bnCalcModuleScore(pDnode);
float vnodeScore = bnCalcVnodeScore(pDnode, 0);
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDnode->dnodeId;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = systemScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = pDnode->customScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (int32_t)moduleScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (int32_t)vnodeScore;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (int32_t)(vnodeScore + moduleScore + pDnode->customScore + systemScore);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDnode->openVnodes;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDnode->numOfCores;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status));
cols++;
numOfRows++;
mnodeDecDnodeRef(pDnode);
}
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows;
return numOfRows;
}

129
src/balance/src/bnThread.c Normal file
View File

@ -0,0 +1,129 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "ttimer.h"
#include "tglobal.h"
#include "mnodeSdb.h"
#include "bnThread.h"
static SBnThread tsBnThread;
static void *bnThreadFunc(void *arg) {
while (1) {
pthread_mutex_lock(&tsBnThread.mutex);
if (tsBnThread.stop) {
pthread_mutex_unlock(&tsBnThread.mutex);
break;
}
pthread_cond_wait(&tsBnThread.cond, &tsBnThread.mutex);
bool updateSoon = bnStart();
bnStartTimer(updateSoon ? 1000 : -1);
pthread_mutex_unlock(&(tsBnThread.mutex));
}
return NULL;
}
int32_t bnInitThread() {
memset(&tsBnThread, 0, sizeof(SBnThread));
tsBnThread.stop = false;
pthread_mutex_init(&tsBnThread.mutex, NULL);
pthread_cond_init(&tsBnThread.cond, NULL);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
int32_t ret = pthread_create(&tsBnThread.thread, &thattr, bnThreadFunc, NULL);
pthread_attr_destroy(&thattr);
if (ret != 0) {
mError("failed to create balance thread since %s", strerror(errno));
return -1;
}
bnStartTimer(2000);
mDebug("balance thread is created");
return 0;
}
void bnCleanupThread() {
mDebug("balance thread will be cleanup");
if (tsBnThread.timer != NULL) {
taosTmrStopA(&tsBnThread.timer);
tsBnThread.timer = NULL;
mDebug("stop balance timer");
}
pthread_mutex_lock(&tsBnThread.mutex);
tsBnThread.stop = true;
pthread_cond_signal(&tsBnThread.cond);
pthread_mutex_unlock(&(tsBnThread.mutex));
pthread_join(tsBnThread.thread, NULL);
pthread_cond_destroy(&tsBnThread.cond);
pthread_mutex_destroy(&tsBnThread.mutex);
}
static void bnPostSignal() {
pthread_mutex_lock(&tsBnThread.mutex);
pthread_cond_signal(&tsBnThread.cond);
pthread_mutex_unlock(&(tsBnThread.mutex));
}
/*
* once sdb work as mater, then tsAccessSquence reset to zero
* increase tsAccessSquence every balance interval
*/
static void bnProcessTimer(void *handle, void *tmrId) {
if (!sdbIsMaster()) return;
if (tsBnThread.stop) return;
tsBnThread.timer = NULL;
tsAccessSquence++;
bnCheckStatus();
bnStartTimer(-1);
if (handle == NULL) {
if (tsAccessSquence % tsBalanceInterval == 0) {
mDebug("balance function is scheduled by timer");
bnPostSignal();
}
} else {
int64_t mseconds = (int64_t)handle;
mDebug("balance function is scheduled by event for %" PRId64 " mseconds arrived", mseconds);
bnPostSignal();
}
}
void bnStartTimer(int64_t mseconds) {
if (tsBnThread.stop) return;
bool updateSoon = (mseconds != -1);
if (updateSoon) {
taosTmrReset(bnProcessTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBnThread.timer);
} else {
taosTmrReset(bnProcessTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBnThread.timer);
}
}
void bnNotify() {
bnStartTimer(500);
}

View File

@ -5103,7 +5103,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
const int tokenDebugFlagEnd = 20;
const SDNodeDynConfOption cfgOptions[] = {
{"resetLog", 8}, {"resetQueryCache", 15}, {"balance", 7}, {"monitor", 7},
{"debugFlag", 9}, {"monitorDebugFlag", 16}, {"vDebugFlag", 10}, {"mDebugFlag", 10},
{"debugFlag", 9}, {"monDebugFlag", 11}, {"vDebugFlag", 10}, {"mDebugFlag", 10},
{"cDebugFlag", 10}, {"httpDebugFlag", 13}, {"qDebugflag", 10}, {"sdbDebugFlag", 12},
{"uDebugFlag", 10}, {"tsdbDebugFlag", 13}, {"sDebugflag", 10}, {"rpcDebugFlag", 12},
{"dDebugFlag", 10}, {"mqttDebugFlag", 13}, {"wDebugFlag", 10}, {"tmrDebugFlag", 12},

View File

@ -2058,6 +2058,10 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
if (pConnect->epSet.numOfEps > 0) {
tscEpSetHtons(&pConnect->epSet);
tscUpdateMgmtEpSet(&pConnect->epSet);
for (int i = 0; i < pConnect->epSet.numOfEps; ++i) {
tscDebug("%p epSet.fqdn[%d]: %s, pObj:%p", pSql, i, pConnect->epSet.fqdn[i], pObj);
}
}
strcpy(pObj->sversion, pConnect->serverVersion);

View File

@ -157,7 +157,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
registerSqlObj(pSql);
code = tsParseSql(pSql, false);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tsem_wait(&pSub->sem);
code = pSql->res.code;
@ -168,7 +168,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail;
}
if (pSql->cmd.command != TSDB_SQL_SELECT) {
if (pSql->cmd.command != TSDB_SQL_SELECT && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
line = __LINE__;
code = TSDB_CODE_TSC_INVALID_SQL;
goto fail;
@ -182,7 +182,7 @@ fail:
if (pSql->self != 0) {
taosReleaseRef(tscObjRef, pSql->self);
} else {
tscFreeSqlObj(pSql);
tscFreeSqlObj(pSql);
}
pSql = NULL;
@ -401,9 +401,11 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char
tscLoadSubscriptionProgress(pSub);
}
if (!tscUpdateSubscription(pObj, pSub)) {
taos_unsubscribe(pSub, 1);
return NULL;
if (pSub->pSql->cmd.command == TSDB_SQL_SELECT) {
if (!tscUpdateSubscription(pObj, pSub)) {
taos_unsubscribe(pSub, 1);
return NULL;
}
}
pSub->interval = interval;
@ -417,10 +419,80 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char
return pSub;
}
SSqlObj* recreateSqlObj(SSub* pSub) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
return NULL;
}
pSql->signature = pSql;
pSql->pTscObj = pSub->taos;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
tscFreeSqlObj(pSql);
return NULL;
}
pSql->param = pSub;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = asyncCallback;
pSql->fetchFp = asyncCallback;
pSql->sqlstr = strdup(pSub->pSql->sqlstr);
if (pSql->sqlstr == NULL) {
tscFreeSqlObj(pSql);
return NULL;
}
pRes->qhandle = 0;
pRes->numOfRows = 1;
int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
return NULL;
}
registerSqlObj(pSql);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tsem_wait(&pSub->sem);
code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) {
taosReleaseRef(tscObjRef, pSql->self);
return NULL;
}
if (pSql->cmd.command != TSDB_SQL_SELECT) {
taosReleaseRef(tscObjRef, pSql->self);
return NULL;
}
return pSql;
}
TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSub *pSub = (SSub *)tsub;
if (pSub == NULL) return NULL;
if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
SSqlObj* pSql = recreateSqlObj(pSub);
if (pSql == NULL) {
return NULL;
}
if (pSub->pSql->self != 0) {
taosReleaseRef(tscObjRef, pSub->pSql->self);
} else {
tscFreeSqlObj(pSub->pSql);
}
pSub->pSql = pSql;
pSql->pSubscription = pSub;
}
tscSaveSubscriptionProgress(pSub);
SSqlObj *pSql = pSub->pSql;
@ -512,10 +584,13 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
}
if (pSub->pSql != NULL) {
taos_free_result(pSub->pSql);
if (pSub->pSql->self != 0) {
taosReleaseRef(tscObjRef, pSub->pSql->self);
} else {
tscFreeSqlObj(pSub->pSql);
}
}
tscFreeSqlObj(pSub->pSql);
taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem);
memset(pSub, 0, sizeof(*pSub));

View File

@ -175,7 +175,7 @@ extern int32_t tmrDebugFlag;
extern int32_t sdbDebugFlag;
extern int32_t httpDebugFlag;
extern int32_t mqttDebugFlag;
extern int32_t monitorDebugFlag;
extern int32_t monDebugFlag;
extern int32_t uDebugFlag;
extern int32_t rpcDebugFlag;
extern int32_t odbcDebugFlag;

View File

@ -212,7 +212,7 @@ int32_t jniDebugFlag = 131;
int32_t odbcDebugFlag = 131;
int32_t httpDebugFlag = 131;
int32_t mqttDebugFlag = 131;
int32_t monitorDebugFlag = 131;
int32_t monDebugFlag = 131;
int32_t qDebugFlag = 131;
int32_t rpcDebugFlag = 131;
int32_t uDebugFlag = 131;
@ -223,9 +223,9 @@ int32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 135;
int32_t fsDebugFlag = 135;
int32_t (*monitorStartSystemFp)() = NULL;
void (*monitorStopSystemFp)() = NULL;
void (*monitorExecuteSQLFp)(char *sql) = NULL;
int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL;
void (*monExecuteSQLFp)(char *sql) = NULL;
char *qtypeStr[] = {"rpc", "fwd", "wal", "cq", "query"};
@ -242,7 +242,7 @@ void taosSetAllDebugFlag() {
odbcDebugFlag = debugFlag;
httpDebugFlag = debugFlag;
mqttDebugFlag = debugFlag;
monitorDebugFlag = debugFlag;
monDebugFlag = debugFlag;
qDebugFlag = debugFlag;
rpcDebugFlag = debugFlag;
uDebugFlag = debugFlag;
@ -283,15 +283,15 @@ bool taosCfgDynamicOptions(char *msg) {
if (strncasecmp(cfg->option, "monitor", olen) == 0) {
if (1 == vint) {
if (monitorStartSystemFp) {
(*monitorStartSystemFp)();
if (monStartSystemFp) {
(*monStartSystemFp)();
uInfo("monitor is enabled");
} else {
uError("monitor can't be updated, for monitor not initialized");
}
} else {
if (monitorStopSystemFp) {
(*monitorStopSystemFp)();
if (monStopSystemFp) {
(*monStopSystemFp)();
uInfo("monitor is disabled");
} else {
uError("monitor can't be updated, for monitor not initialized");
@ -314,8 +314,8 @@ bool taosCfgDynamicOptions(char *msg) {
}
if (strncasecmp(option, "resetQueryCache", 15) == 0) {
if (monitorExecuteSQLFp) {
(*monitorExecuteSQLFp)("resetQueryCache");
if (monExecuteSQLFp) {
(*monExecuteSQLFp)("resetQueryCache");
uInfo("resetquerycache is executed");
} else {
uError("resetquerycache can't be executed, for monitor not started");
@ -1267,8 +1267,8 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "monitorDebugFlag";
cfg.ptr = &monitorDebugFlag;
cfg.option = "monDebugFlag";
cfg.ptr = &monDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG;
cfg.minValue = 0;

View File

@ -24,7 +24,7 @@
#include "tqueue.h"
#include "tsync.h"
#include "ttimer.h"
#include "tbalance.h"
#include "tbn.h"
#include "tglobal.h"
#include "dnode.h"
#include "vnode.h"

View File

@ -78,10 +78,10 @@ static void dnodeAllocModules() {
tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1);
tsModule[TSDB_MOD_MONITOR].name = "monitor";
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
tsModule[TSDB_MOD_MONITOR].initFp = monInitSystem;
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monCleanupSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monStopSystem;
if (tsEnableMonitorModule) {
dnodeSetModuleStatus(TSDB_MOD_MONITOR);
}

View File

@ -47,13 +47,13 @@ typedef struct {
int8_t accessState;
} SAcctMonitorObj;
int32_t monitorInitSystem();
int32_t monitorStartSystem();
void monitorStopSystem();
void monitorCleanUpSystem();
void monitorSaveAcctLog(SAcctMonitorObj *pMonObj);
void monitorSaveLog(int32_t level, const char *const format, ...);
void monitorExecuteSQL(char *sql);
int32_t monInitSystem();
int32_t monStartSystem();
void monStopSystem();
void monCleanupSystem();
void monSaveAcctLog(SAcctMonitorObj *pMonObj);
void monSaveLog(int32_t level, const char *const format, ...);
void monExecuteSQL(char *sql);
#ifdef __cplusplus
}

View File

@ -23,14 +23,14 @@ extern "C" {
struct SVgObj;
struct SDnodeObj;
int32_t balanceInit();
void balanceCleanUp();
void balanceAsyncNotify();
void balanceSyncNotify();
void balanceReset();
int32_t balanceAllocVnodes(struct SVgObj *pVgroup);
int32_t balanceAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId);
int32_t balanceDropDnode(struct SDnodeObj *pDnode);
int32_t bnInit();
void bnCleanUp();
void bnNotify();
void bnCheckModules();
void bnReset();
int32_t bnAllocVnodes(struct SVgObj *pVgroup);
int32_t bnAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId);
int32_t bnDropDnode(struct SDnodeObj *pDnode);
#ifdef __cplusplus
}

View File

@ -41,9 +41,9 @@ extern int32_t sdbDebugFlag;
#define sdbDebug(...) { if (sdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }}
#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }}
#define mLError(...) { monitorSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) }
#define mLWarn(...) { monitorSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
#define mLInfo(...) { monitorSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) }
#define mLError(...) { monSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) }
#define mLWarn(...) { monSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
#define mLInfo(...) { monSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) }
#ifdef __cplusplus
}

View File

@ -20,7 +20,7 @@
#include "tgrant.h"
#include "tglobal.h"
#include "tname.h"
#include "tbalance.h"
#include "tbn.h"
#include "tdataformat.h"
#include "mnode.h"
#include "mnodeDef.h"
@ -997,7 +997,7 @@ static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) {
mDebug("db:%s, all vgroups is altered", pDb->name);
mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
balanceAsyncNotify();
bnNotify();
return TSDB_CODE_SUCCESS;
}

View File

@ -16,12 +16,12 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tgrant.h"
#include "tbalance.h"
#include "tbn.h"
#include "tglobal.h"
#include "tconfig.h"
#include "tutil.h"
#include "tsocket.h"
#include "tbalance.h"
#include "tbn.h"
#include "tsync.h"
#include "tdataformat.h"
#include "mnode.h"
@ -112,7 +112,7 @@ static int32_t mnodeDnodeActionDelete(SSdbRow *pRow) {
SDnodeObj *pDnode = pRow->pObj;
mnodeDropMnodeLocal(pDnode->dnodeId);
balanceAsyncNotify();
bnNotify();
mnodeUpdateDnodeEps();
mDebug("dnode:%d, all vgroups is dropped from sdb", pDnode->dnodeId);
@ -344,7 +344,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_INVALID_DNODE_CFG_OPTION;
}
int32_t code = balanceAlterDnode(pDnode, vnodeId, dnodeId);
int32_t code = bnAlterDnode(pDnode, vnodeId, dnodeId);
mnodeDecDnodeRef(pDnode);
return code;
} else {
@ -588,8 +588,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
mInfo("dnode:%d, from offline to online", pDnode->dnodeId);
pDnode->status = TAOS_DN_STATUS_READY;
pDnode->offlineReason = TAOS_DN_OFF_ONLINE;
balanceSyncNotify();
balanceAsyncNotify();
bnCheckModules();
bnNotify();
}
if (openVnodes != pDnode->openVnodes) {
@ -702,7 +702,7 @@ static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) {
mInfo("dnode:%d, start to drop it", pDnode->dnodeId);
int32_t code = balanceDropDnode(pDnode);
int32_t code = bnDropDnode(pDnode);
mnodeDecDnodeRef(pDnode);
return code;
}

View File

@ -17,7 +17,7 @@
#include "os.h"
#include "taosdef.h"
#include "tsched.h"
#include "tbalance.h"
#include "tbn.h"
#include "tgrant.h"
#include "ttimer.h"
#include "tglobal.h"
@ -52,7 +52,7 @@ static SStep tsMnodeSteps[] = {
{"tables", mnodeInitTables, mnodeCleanupTables},
{"mnodes", mnodeInitMnodes, mnodeCleanupMnodes},
{"sdb", sdbInit, sdbCleanUp},
{"balance", balanceInit, balanceCleanUp},
{"balance", bnInit, bnCleanUp},
{"grant", grantInit, grantCleanUp},
{"show", mnodeInitShow, mnodeCleanUpShow}
};

View File

@ -19,7 +19,7 @@
#include "tglobal.h"
#include "trpc.h"
#include "tsync.h"
#include "tbalance.h"
#include "tbn.h"
#include "tutil.h"
#include "tsocket.h"
#include "tdataformat.h"

View File

@ -19,7 +19,7 @@
#include "tsched.h"
#include "tutil.h"
#include "tgrant.h"
#include "tbalance.h"
#include "tbn.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"

View File

@ -17,7 +17,7 @@
#include "os.h"
#include "taosdef.h"
#include "tsched.h"
#include "tbalance.h"
#include "tbn.h"
#include "tgrant.h"
#include "ttimer.h"
#include "tglobal.h"

View File

@ -19,7 +19,7 @@
#include "hash.h"
#include "tutil.h"
#include "tref.h"
#include "tbalance.h"
#include "tbn.h"
#include "tqueue.h"
#include "twal.h"
#include "tsync.h"
@ -244,7 +244,7 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) {
sdbInfo("vgId:1, mnode role changed from %s to %s", syncRole[tsSdbMgmt.role], syncRole[role]);
if (role == TAOS_SYNC_ROLE_MASTER && tsSdbMgmt.role != TAOS_SYNC_ROLE_MASTER) {
balanceReset();
bnReset();
}
tsSdbMgmt.role = role;

View File

@ -20,7 +20,7 @@
#include "tsocket.h"
#include "tidpool.h"
#include "tsync.h"
#include "tbalance.h"
#include "tbn.h"
#include "tglobal.h"
#include "tdataformat.h"
#include "dnode.h"
@ -563,7 +563,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
pVgroup->numOfVnodes = pDb->cfg.replications;
pVgroup->createdTime = taosGetTimestampMs();
pVgroup->accessState = TSDB_VN_ALL_ACCCESS;
int32_t code = balanceAllocVnodes(pVgroup);
int32_t code = bnAllocVnodes(pVgroup);
if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, no enough dnode to alloc %d vnodes to vgroup, reason:%s", pDb->name, pVgroup->numOfVnodes,
tstrerror(code));

View File

@ -17,7 +17,7 @@
#include "os.h"
#include "taosdef.h"
#include "tsched.h"
#include "tbalance.h"
#include "tbn.h"
#include "tgrant.h"
#include "tglobal.h"
#include "trpc.h"

View File

@ -26,12 +26,12 @@
#include "monitor.h"
#include "taoserror.h"
#define mnFatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }}
#define mnError(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }}
#define mnWarn(...) { if (monitorDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }}
#define mnInfo(...) { if (monitorDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }}
#define mnDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
#define mnTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
#define monFatal(...) { if (monDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }}
#define monError(...) { if (monDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }}
#define monWarn(...) { if (monDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }}
#define monInfo(...) { if (monDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }}
#define monDebug(...) { if (monDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monDebugFlag, __VA_ARGS__); }}
#define monTrace(...) { if (monDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monDebugFlag, __VA_ARGS__); }}
#define SQL_LENGTH 1030
#define LOG_LEN_STR 100
@ -47,12 +47,12 @@ typedef enum {
MON_CMD_CREATE_TB_ACCT_ROOT,
MON_CMD_CREATE_TB_SLOWQUERY,
MON_CMD_MAX
} EMonitorCommand;
} EMonCmd;
typedef enum {
MON_STATE_NOT_INIT,
MON_STATE_INITED
} EMonitorState;
} EMonState;
typedef struct {
pthread_t thread;
@ -63,17 +63,17 @@ typedef struct {
int8_t start; // enable/disable by mnode
int8_t quiting; // taosd is quiting
char sql[SQL_LENGTH + 1];
} SMonitorConn;
} SMonConn;
static SMonitorConn tsMonitor = {0};
static void monitorSaveSystemInfo();
static void *monitorThreadFunc(void *param);
static void monitorBuildMonitorSql(char *sql, int32_t cmd);
extern int32_t (*monitorStartSystemFp)();
extern void (*monitorStopSystemFp)();
extern void (*monitorExecuteSQLFp)(char *sql);
static SMonConn tsMonitor = {0};
static void monSaveSystemInfo();
static void *monThreadFunc(void *param);
static void monBuildMonitorSql(char *sql, int32_t cmd);
extern int32_t (*monStartSystemFp)();
extern void (*monStopSystemFp)();
extern void (*monExecuteSQLFp)(char *sql);
int32_t monitorInitSystem() {
int32_t monInitSystem() {
if (tsMonitor.ep[0] == 0) {
strcpy(tsMonitor.ep, tsLocalEp);
}
@ -89,29 +89,29 @@ int32_t monitorInitSystem() {
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&tsMonitor.thread, &thAttr, monitorThreadFunc, NULL)) {
mnError("failed to create thread to for monitor module, reason:%s", strerror(errno));
if (pthread_create(&tsMonitor.thread, &thAttr, monThreadFunc, NULL)) {
monError("failed to create thread to for monitor module, reason:%s", strerror(errno));
return -1;
}
pthread_attr_destroy(&thAttr);
mnDebug("monitor thread is launched");
monDebug("monitor thread is launched");
monitorStartSystemFp = monitorStartSystem;
monitorStopSystemFp = monitorStopSystem;
monStartSystemFp = monStartSystem;
monStopSystemFp = monStopSystem;
return 0;
}
int32_t monitorStartSystem() {
int32_t monStartSystem() {
taos_init();
tsMonitor.start = 1;
monitorExecuteSQLFp = monitorExecuteSQL;
mnInfo("monitor module start");
monExecuteSQLFp = monExecuteSQL;
monInfo("monitor module start");
return 0;
}
static void *monitorThreadFunc(void *param) {
mnDebug("starting to initialize monitor module ...");
static void *monThreadFunc(void *param) {
monDebug("starting to initialize monitor module ...");
while (1) {
static int32_t accessTimes = 0;
@ -120,7 +120,7 @@ static void *monitorThreadFunc(void *param) {
if (tsMonitor.quiting) {
tsMonitor.state = MON_STATE_NOT_INIT;
mnInfo("monitor thread will quit, for taosd is quiting");
monInfo("monitor thread will quit, for taosd is quiting");
break;
} else {
taosGetDisk();
@ -131,7 +131,7 @@ static void *monitorThreadFunc(void *param) {
}
if (dnodeGetDnodeId() <= 0) {
mnDebug("dnode not initialized, waiting for 3000 ms to start monitor module");
monDebug("dnode not initialized, waiting for 3000 ms to start monitor module");
continue;
}
@ -139,10 +139,10 @@ static void *monitorThreadFunc(void *param) {
tsMonitor.state = MON_STATE_NOT_INIT;
tsMonitor.conn = taos_connect(NULL, "monitor", tsInternalPass, "", 0);
if (tsMonitor.conn == NULL) {
mnError("failed to connect to database, reason:%s", tstrerror(terrno));
monError("failed to connect to database, reason:%s", tstrerror(terrno));
continue;
} else {
mnDebug("connect to database success");
monDebug("connect to database success");
}
}
@ -150,16 +150,16 @@ static void *monitorThreadFunc(void *param) {
int code = 0;
for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) {
monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex);
monBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex);
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
mnError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code));
monError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code));
break;
} else {
mnDebug("successfully to exec sql:%s", tsMonitor.sql);
monDebug("successfully to exec sql:%s", tsMonitor.sql);
}
}
@ -170,16 +170,16 @@ static void *monitorThreadFunc(void *param) {
if (tsMonitor.state == MON_STATE_INITED) {
if (accessTimes % tsMonitorInterval == 0) {
monitorSaveSystemInfo();
monSaveSystemInfo();
}
}
}
mnInfo("monitor thread is stopped");
monInfo("monitor thread is stopped");
return NULL;
}
static void monitorBuildMonitorSql(char *sql, int32_t cmd) {
static void monBuildMonitorSql(char *sql, int32_t cmd) {
memset(sql, 0, SQL_LENGTH);
if (cmd == MON_CMD_CREATE_DB) {
@ -235,47 +235,47 @@ static void monitorBuildMonitorSql(char *sql, int32_t cmd) {
sql[SQL_LENGTH] = 0;
}
void monitorStopSystem() {
void monStopSystem() {
tsMonitor.start = 0;
tsMonitor.state = MON_STATE_NOT_INIT;
monitorExecuteSQLFp = NULL;
mnInfo("monitor module stopped");
monExecuteSQLFp = NULL;
monInfo("monitor module stopped");
}
void monitorCleanUpSystem() {
void monCleanupSystem() {
tsMonitor.quiting = 1;
monitorStopSystem();
monStopSystem();
pthread_join(tsMonitor.thread, NULL);
if (tsMonitor.conn != NULL) {
taos_close(tsMonitor.conn);
tsMonitor.conn = NULL;
}
mnInfo("monitor module is cleaned up");
monInfo("monitor module is cleaned up");
}
// unit is MB
static int32_t monitorBuildMemorySql(char *sql) {
static int32_t monBuildMemorySql(char *sql) {
float sysMemoryUsedMB = 0;
bool suc = taosGetSysMemory(&sysMemoryUsedMB);
if (!suc) {
mnDebug("failed to get sys memory info");
monDebug("failed to get sys memory info");
}
float procMemoryUsedMB = 0;
suc = taosGetProcMemory(&procMemoryUsedMB);
if (!suc) {
mnDebug("failed to get proc memory info");
monDebug("failed to get proc memory info");
}
return sprintf(sql, ", %f, %f, %d", procMemoryUsedMB, sysMemoryUsedMB, tsTotalMemoryMB);
}
// unit is %
static int32_t monitorBuildCpuSql(char *sql) {
static int32_t monBuildCpuSql(char *sql) {
float sysCpuUsage = 0, procCpuUsage = 0;
bool suc = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage);
if (!suc) {
mnDebug("failed to get cpu usage");
monDebug("failed to get cpu usage");
}
if (sysCpuUsage <= procCpuUsage) {
@ -286,72 +286,72 @@ static int32_t monitorBuildCpuSql(char *sql) {
}
// unit is GB
static int32_t monitorBuildDiskSql(char *sql) {
static int32_t monBuildDiskSql(char *sql) {
return sprintf(sql, ", %f, %d", (tsTotalDataDirGB - tsAvailDataDirGB), (int32_t)tsTotalDataDirGB);
}
// unit is Kb
static int32_t monitorBuildBandSql(char *sql) {
static int32_t monBuildBandSql(char *sql) {
float bandSpeedKb = 0;
bool suc = taosGetBandSpeed(&bandSpeedKb);
if (!suc) {
mnDebug("failed to get bandwidth speed");
monDebug("failed to get bandwidth speed");
}
return sprintf(sql, ", %f", bandSpeedKb);
}
static int32_t monitorBuildReqSql(char *sql) {
static int32_t monBuildReqSql(char *sql) {
SStatisInfo info = dnodeGetStatisInfo();
return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum);
}
static int32_t monitorBuildIoSql(char *sql) {
static int32_t monBuildIoSql(char *sql) {
float readKB = 0, writeKB = 0;
bool suc = taosGetProcIO(&readKB, &writeKB);
if (!suc) {
mnDebug("failed to get io info");
monDebug("failed to get io info");
}
return sprintf(sql, ", %f, %f", readKB, writeKB);
}
static void monitorSaveSystemInfo() {
static void monSaveSystemInfo() {
int64_t ts = taosGetTimestampUs();
char * sql = tsMonitor.sql;
int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.dn%d values(%" PRId64, tsMonitorDbName, dnodeGetDnodeId(), ts);
pos += monitorBuildCpuSql(sql + pos);
pos += monitorBuildMemorySql(sql + pos);
pos += monitorBuildDiskSql(sql + pos);
pos += monitorBuildBandSql(sql + pos);
pos += monitorBuildIoSql(sql + pos);
pos += monitorBuildReqSql(sql + pos);
pos += monBuildCpuSql(sql + pos);
pos += monBuildMemorySql(sql + pos);
pos += monBuildDiskSql(sql + pos);
pos += monBuildBandSql(sql + pos);
pos += monBuildIoSql(sql + pos);
pos += monBuildReqSql(sql + pos);
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
mnError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql);
monError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql);
} else {
mnDebug("successfully to save system info, sql:%s", tsMonitor.sql);
monDebug("successfully to save system info, sql:%s", tsMonitor.sql);
}
}
static void montiorExecSqlCb(void *param, TAOS_RES *result, int32_t code) {
static void monExecSqlCb(void *param, TAOS_RES *result, int32_t code) {
int32_t c = taos_errno(result);
if (c != TSDB_CODE_SUCCESS) {
mnError("save %s failed, reason:%s", (char *)param, tstrerror(c));
monError("save %s failed, reason:%s", (char *)param, tstrerror(c));
} else {
int32_t rows = taos_affected_rows(result);
mnDebug("save %s succ, rows:%d", (char *)param, rows);
monDebug("save %s succ, rows:%d", (char *)param, rows);
}
taos_free_result(result);
}
void monitorSaveAcctLog(SAcctMonitorObj *pMon) {
void monSaveAcctLog(SAcctMonitorObj *pMon) {
if (tsMonitor.state != MON_STATE_INITED) return;
char sql[1024] = {0};
@ -381,11 +381,11 @@ void monitorSaveAcctLog(SAcctMonitorObj *pMon) {
pMon->totalConns, pMon->maxConns,
pMon->accessState);
mnDebug("save account info, sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "account info");
monDebug("save account info, sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "account info");
}
void monitorSaveLog(int32_t level, const char *const format, ...) {
void monSaveLog(int32_t level, const char *const format, ...) {
if (tsMonitor.state != MON_STATE_INITED) return;
va_list argpointer;
@ -402,13 +402,13 @@ void monitorSaveLog(int32_t level, const char *const format, ...) {
len += sprintf(sql + len, "', '%s')", tsLocalEp);
sql[len++] = 0;
mnDebug("save log, sql: %s", sql);
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "log");
monDebug("save log, sql: %s", sql);
taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "log");
}
void monitorExecuteSQL(char *sql) {
void monExecuteSQL(char *sql) {
if (tsMonitor.state != MON_STATE_INITED) return;
mnDebug("execute sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "sql");
monDebug("execute sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "sql");
}

View File

@ -56,7 +56,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.8</version>
<version>2.0.12</version>
</dependency>
<dependency>
<groupId>log4j</groupId>

View File

@ -6,10 +6,24 @@ TDengine's JDBC demo project is organized in a Maven way so that users can easil
Make sure you have already installed a tdengine client on your current develop environment.
Download the tdengine package on our website: ``https://www.taosdata.com/cn/all-downloads/`` and install the client.
## How to run jdbcChecker
<pre>mvn clean compile exec:java -Dexec.mainClass="com.taosdata.example.JdbcChecker" -Dexec.args="-host localhost"</pre>
## How to run jdbcTaosDemo
run command:
<pre> mvn clean compile exec:java -Dexec.mainClass="com.taosdata.example.jdbcTaosdemo.JdbcTaosdemo"</pre>
and run with your customed args
<pre>mvn clean compile exec:java -Dexec.mainClass="com.taosdata.example.jdbcTaosdemo.JdbcTaosdemo" -Dexec.args="-host localhost"</pre>
## Compile the Demo Code and Run It
To compile the demo project, go to the source directory ``TDengine/tests/examples/JDBC/JDBCDemo`` and execute
<pre>mvn clean package assembly:single</pre>
<pre>
mvn clean package assembly:single
</pre>
The ``pom.xml`` is configured to package all the dependencies into one executable jar file.
To run it, go to ``examples/JDBC/JDBCDemo/target`` and execute
<pre>java -jar jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host localhost</pre>
<pre>java -jar jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host localhost</pre>

View File

@ -41,7 +41,7 @@ public class InsertTableTask implements Runnable {
long ts = start.toEpochMilli() + (j * timeGap);
// insert data into echo table
for (int i = startTbIndex; i < startTbIndex + tableNumber; i++) {
String sql = SqlSpeller.insertOneRowSQL(config.getDbName(), config.getTbPrefix(), i + 1, ts);
String sql = SqlSpeller.insertBatchSizeRowsSQL(config.getDbName(), config.getTbPrefix(), i + 1, ts, config.getNumberOfRecordsPerRequest());
logger.info(Thread.currentThread().getName() + ">>> " + sql);
Statement statement = connection.createStatement();
statement.execute(sql);

View File

@ -32,9 +32,9 @@ class TDTestCase:
tdSql.query("show databases")
tdSql.checkData(0, 14, 2)
tdSql.execute("alter database db keep 365")
tdSql.execute("alter database db keep 365,365,365")
tdSql.query("show databases")
tdSql.checkData(0, 7, "3650,3650,365")
tdSql.checkData(0, 7, "365,365,365")
tdSql.execute("alter database db quorum 2")
tdSql.query("show databases")

View File

@ -352,6 +352,12 @@ class ThreadCoordinator:
self._execStats.registerFailure("Broken DB Connection")
# continue # don't do that, need to tap all threads at
# end, and maybe signal them to stop
if isinstance(err, CrashGenError): # our own transition failure
Logging.info("State transition error")
traceback.print_stack()
transitionFailed = True
self._te = None # Not running any more
self._execStats.registerFailure("State transition error")
else:
raise
# return transitionFailed # Why did we have this??!!
@ -388,12 +394,20 @@ class ThreadCoordinator:
self._syncAtBarrier() # For now just cross the barrier
Progress.emit(Progress.END_THREAD_STEP)
except threading.BrokenBarrierError as err:
Logging.info("Main loop aborted, caused by worker thread(s) time-out")
self._execStats.registerFailure("Aborted due to worker thread timeout")
print("\n\nWorker Thread time-out detected, TAOS related threads are:")
Logging.error("\n")
Logging.error("Main loop aborted, caused by worker thread(s) time-out of {} seconds".format(
ThreadCoordinator.WORKER_THREAD_TIMEOUT))
Logging.error("TAOS related threads blocked at (stack frames top-to-bottom):")
ts = ThreadStacks()
ts.print(filterInternal=True)
workerTimeout = True
# Enable below for deadlock debugging, using gdb to attach to process
# while True:
# Logging.error("Deadlock detected")
# time.sleep(60.0)
break
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
@ -701,7 +715,7 @@ class AnyState:
# task.logDebug("Task success found")
sCnt += 1
if (sCnt >= 2):
raise RuntimeError(
raise CrashGenError(
"Unexpected more than 1 success with task: {}".format(cls))
def assertIfExistThenSuccess(self, tasks, cls):
@ -714,7 +728,7 @@ class AnyState:
if task.isSuccess():
sCnt += 1
if (exists and sCnt <= 0):
raise RuntimeError("Unexpected zero success for task type: {}, from tasks: {}"
raise CrashGenError("Unexpected zero success for task type: {}, from tasks: {}"
.format(cls, tasks))
def assertNoTask(self, tasks, cls):
@ -727,7 +741,7 @@ class AnyState:
for task in tasks:
if isinstance(task, cls):
if task.isSuccess():
raise RuntimeError(
raise CrashGenError(
"Unexpected successful task: {}".format(cls))
def hasSuccess(self, tasks, cls):
@ -926,8 +940,9 @@ class StateMechine:
Logging.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
return StateDbOnly()
# For sure we have tables, which means we must have the super table. # TODO: are we sure?
sTable = self._db.getFixedSuperTable()
if sTable.hasRegTables(dbc, dbName): # no regular tables
if sTable.hasRegTables(dbc): # no regular tables
Logging.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
return StateSuperTableOnly()
else: # has actual tables
@ -1050,9 +1065,8 @@ class Database:
def getFixedSuperTableName(cls):
return "fs_table"
@classmethod
def getFixedSuperTable(cls) -> TdSuperTable:
return TdSuperTable(cls.getFixedSuperTableName())
def getFixedSuperTable(self) -> TdSuperTable:
return TdSuperTable(self.getFixedSuperTableName(), self.getName())
# We aim to create a starting time tick, such that, whenever we run our test here once
# We should be able to safely create 100,000 records, which will not have any repeated time stamp
@ -1107,6 +1121,11 @@ class Database:
# print("Float obtained: {}".format(ret))
return ret
ALL_COLORS = ['red', 'white', 'blue', 'green', 'purple']
def getNextColor(self):
return random.choice(self.ALL_COLORS)
class TaskExecutor():
class BoundedList:
@ -1240,7 +1259,7 @@ class Task():
if errno in [
0x05, # TSDB_CODE_RPC_NOT_READY
0x0B, # Unable to establish connection, more details in TD-1648
0x200, # invalid SQL TODO: re-examine with TD-934
# 0x200, # invalid SQL TODO: re-examine with TD-934
0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776
0x213, # "Disconnected from service", result of "kill connection ???"
0x217, # "db not selected", client side defined error code
@ -1569,8 +1588,8 @@ class TaskCreateSuperTable(StateTransitionTask):
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
# wt.execSql("use db") # should always be in place
sTable.create(wt.getDbConn(), self._db.getName(),
{'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'},
sTable.create(wt.getDbConn(),
{'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'},
dropIfExists = True
)
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
@ -1579,30 +1598,33 @@ class TaskCreateSuperTable(StateTransitionTask):
class TdSuperTable:
def __init__(self, stName):
def __init__(self, stName, dbName):
self._stName = stName
self._dbName = dbName
def getName(self):
return self._stName
def drop(self, dbc, dbName, skipCheck = False):
if self.exists(dbc, dbName) : # if myself exists
def drop(self, dbc, skipCheck = False):
dbName = self._dbName
if self.exists(dbc) : # if myself exists
fullTableName = dbName + '.' + self._stName
dbc.execute("DROP TABLE {}".format(fullTableName))
else:
if not skipCheck:
raise CrashGenError("Cannot drop non-existant super table: {}".format(self._stName))
def exists(self, dbc, dbName):
dbc.execute("USE " + dbName)
def exists(self, dbc):
dbc.execute("USE " + self._dbName)
return dbc.existsSuperTable(self._stName)
# TODO: odd semantic, create() method is usually static?
def create(self, dbc, dbName, cols: dict, tags: dict,
def create(self, dbc, cols: dict, tags: dict,
dropIfExists = False
):
'''Creating a super table'''
dbName = self._dbName
dbc.execute("USE " + dbName)
fullTableName = dbName + '.' + self._stName
if dbc.existsSuperTable(self._stName):
@ -1623,7 +1645,8 @@ class TdSuperTable:
)
dbc.execute(sql)
def getRegTables(self, dbc: DbConn, dbName: str):
def getRegTables(self, dbc: DbConn):
dbName = self._dbName
try:
dbc.query("select TBNAME from {}.{}".format(dbName, self._stName)) # TODO: analyze result set later
except taos.error.ProgrammingError as err:
@ -1634,10 +1657,11 @@ class TdSuperTable:
qr = dbc.getQueryResult()
return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation
def hasRegTables(self, dbc: DbConn, dbName: str):
return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0
def hasRegTables(self, dbc: DbConn):
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
def ensureTable(self, task: Task, dbc: DbConn, dbName: str, regTableName: str):
def ensureTable(self, task: Task, dbc: DbConn, regTableName: str):
dbName = self._dbName
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
if dbc.query(sql) >= 1 : # reg table exists already
return
@ -1650,15 +1674,15 @@ class TdSuperTable:
# print("(" + fullTableName[-3:] + ")", end="", flush=True)
try:
sql = "CREATE TABLE {} USING {}.{} tags ({})".format(
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName)
fullTableName, dbName, self._stName, self._getTagStrForSql(dbc)
)
dbc.execute(sql)
finally:
if task is not None:
task.unlockTable(fullTableName) # no matter what
def _getTagStrForSql(self, dbc, dbName: str) :
tags = self._getTags(dbc, dbName)
def _getTagStrForSql(self, dbc) :
tags = self._getTags(dbc)
tagStrs = []
for tagName in tags:
tagType = tags[tagName]
@ -1672,36 +1696,86 @@ class TdSuperTable:
raise RuntimeError("Unexpected tag type: {}".format(tagType))
return ", ".join(tagStrs)
def _getTags(self, dbc, dbName) -> dict:
dbc.query("DESCRIBE {}.{}".format(dbName, self._stName))
def _getTags(self, dbc) -> dict:
dbc.query("DESCRIBE {}.{}".format(self._dbName, self._stName))
stCols = dbc.getQueryResult()
# print(stCols)
ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
# print("Tags retrieved: {}".format(ret))
return ret
def addTag(self, dbc, dbName, tagName, tagType):
if tagName in self._getTags(dbc, dbName): # already
def addTag(self, dbc, tagName, tagType):
if tagName in self._getTags(dbc): # already
return
# sTable.addTag("extraTag", "int")
sql = "alter table {}.{} add tag {} {}".format(dbName, self._stName, tagName, tagType)
sql = "alter table {}.{} add tag {} {}".format(
self._dbName, self._stName, tagName, tagType)
dbc.execute(sql)
def dropTag(self, dbc, dbName, tagName):
if not tagName in self._getTags(dbc, dbName): # don't have this tag
def dropTag(self, dbc, tagName):
if not tagName in self._getTags(dbc): # don't have this tag
return
sql = "alter table {}.{} drop tag {}".format(dbName, self._stName, tagName)
sql = "alter table {}.{} drop tag {}".format(self._dbName, self._stName, tagName)
dbc.execute(sql)
def changeTag(self, dbc, dbName, oldTag, newTag):
tags = self._getTags(dbc, dbName)
def changeTag(self, dbc, oldTag, newTag):
tags = self._getTags(dbc)
if not oldTag in tags: # don't have this tag
return
if newTag in tags: # already have this tag
return
sql = "alter table {}.{} change tag {} {}".format(dbName, self._stName, oldTag, newTag)
sql = "alter table {}.{} change tag {} {}".format(self._dbName, self._stName, oldTag, newTag)
dbc.execute(sql)
def generateQueries(self, dbc: DbConn) -> List[SqlQuery]:
''' Generate queries to test/exercise this super table '''
ret = [] # type: List[SqlQuery]
for rTbName in self.getRegTables(dbc): # regular tables
filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
None
])
# Run the query against the regular table first
doAggr = (Dice.throw(2) == 0) # 1 in 2 chance
if not doAggr: # don't do aggregate query, just simple one
ret.append(SqlQuery( # reg table
"select {} from {}.{}".format('*', self._dbName, rTbName)))
ret.append(SqlQuery( # super table
"select {} from {}.{}".format('*', self._dbName, self.getName())))
else: # Aggregate query
aggExpr = Dice.choice([
'count(*)',
'avg(speed)',
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
'sum(speed)',
'stddev(speed)',
# SELECTOR functions
'min(speed)',
'max(speed)',
'first(speed)',
'last(speed)',
'top(speed, 50)', # TODO: not supported?
'bottom(speed, 50)', # TODO: not supported?
'apercentile(speed, 10)', # TODO: TD-1316
'last_row(speed)',
# Transformation Functions
# 'diff(speed)', # TODO: no supported?!
'spread(speed)'
]) # TODO: add more from 'top'
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
if Dice.throw(3) == 0: # 1 in X chance
sql = sql + ' GROUP BY color'
Progress.emit(Progress.QUERY_GROUP_BY)
# Logging.info("Executing GROUP-BY query: " + sql)
ret.append(SqlQuery(sql))
return ret
class TaskReadData(StateTransitionTask):
@classmethod
def getEndState(cls):
@ -1716,10 +1790,8 @@ class TaskReadData(StateTransitionTask):
# return True # always
# return gSvcMgr.isActive() # only if it's running TODO: race condition here
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTable = self._db.getFixedSuperTable()
# 1 in 5 chance, simulate a broken connection, only if service stable (not restarting)
def _reconnectIfNeeded(self, wt):
# 1 in 20 chance, simulate a broken connection, only if service stable (not restarting)
if random.randrange(20)==0: # and self._canRestartService(): # TODO: break connection in all situations
# Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG
Progress.emit(Progress.SERVICE_RECONNECT_START)
@ -1744,43 +1816,36 @@ class TaskReadData(StateTransitionTask):
return # TODO: fix server restart status race condtion
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
self._reconnectIfNeeded(wt)
dbc = wt.getDbConn()
dbName = self._db.getName()
for rTbName in sTable.getRegTables(dbc, dbName): # regular tables
aggExpr = Dice.choice([
'*',
'count(*)',
'avg(speed)',
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
'sum(speed)',
'stddev(speed)',
# SELECTOR functions
'min(speed)',
'max(speed)',
'first(speed)',
'last(speed)',
'top(speed, 50)', # TODO: not supported?
'bottom(speed, 50)', # TODO: not supported?
'apercentile(speed, 10)', # TODO: TD-1316
'last_row(speed)',
# Transformation Functions
# 'diff(speed)', # TODO: no supported?!
'spread(speed)'
]) # TODO: add more from 'top'
filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
None
])
sTable = self._db.getFixedSuperTable()
for q in sTable.generateQueries(dbc): # regular tables
try:
# Run the query against the regular table first
dbc.execute("select {} from {}.{}".format(aggExpr, dbName, rTbName))
# Then run it against the super table
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName()))
sql = q.getSql()
# if 'GROUP BY' in sql:
# Logging.info("Executing GROUP-BY query: " + sql)
dbc.execute(sql)
except taos.error.ProgrammingError as err:
errno2 = Helper.convertErrno(err.errno)
Logging.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
raise
class SqlQuery:
@classmethod
def buildRandom(cls, db: Database):
'''Build a random query against a certain database'''
dbName = db.getName()
def __init__(self, sql:str = None):
self._sql = sql
def getSql(self):
return self._sql
class TaskDropSuperTable(StateTransitionTask):
@classmethod
def getEndState(cls):
@ -1837,19 +1902,18 @@ class TaskAlterTags(StateTransitionTask):
# tblName = self._dbManager.getFixedSuperTableName()
dbc = wt.getDbConn()
sTable = self._db.getFixedSuperTable()
dbName = self._db.getName()
dice = Dice.throw(4)
if dice == 0:
sTable.addTag(dbc, dbName, "extraTag", "int")
sTable.addTag(dbc, "extraTag", "int")
# sql = "alter table db.{} add tag extraTag int".format(tblName)
elif dice == 1:
sTable.dropTag(dbc, dbName, "extraTag")
sTable.dropTag(dbc, "extraTag")
# sql = "alter table db.{} drop tag extraTag".format(tblName)
elif dice == 2:
sTable.dropTag(dbc, dbName, "newTag")
sTable.dropTag(dbc, "newTag")
# sql = "alter table db.{} drop tag newTag".format(tblName)
else: # dice == 3
sTable.changeTag(dbc, dbName, "extraTag", "newTag")
sTable.changeTag(dbc, "extraTag", "newTag")
# sql = "alter table db.{} change tag extraTag newTag".format(tblName)
class TaskRestartService(StateTransitionTask):
@ -1920,15 +1984,17 @@ class TaskAddData(StateTransitionTask):
for j in range(numRecords): # number of records per table
nextInt = db.getNextInt()
nextTick = db.getNextTick()
sql += "('{}', {});".format(nextTick, nextInt)
nextColor = db.getNextColor()
sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor)
dbc.execute(sql)
def _addData(self, db, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
for j in range(numRecords): # number of records per table
nextInt = db.getNextInt()
nextTick = db.getNextTick()
nextColor = db.getNextColor()
if gConfig.record_ops:
self.prepToRecordOps()
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
@ -1942,11 +2008,11 @@ class TaskAddData(StateTransitionTask):
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
try:
sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
sql = "insert into {} values ('{}', {}, '{}');".format( # removed: tags ('{}', {})
fullTableName,
# ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt)
nextTick, nextInt, nextColor)
dbc.execute(sql)
except: # Any exception at all
if gConfig.verify_data:
@ -1964,10 +2030,10 @@ class TaskAddData(StateTransitionTask):
.format(nextInt, readBack), 0x999)
except taos.error.ProgrammingError as err:
errno = Helper.convertErrno(err.errno)
if errno in [0x991, 0x992] : # not a single result
if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT] : # not a single result
raise taos.error.ProgrammingError(
"Failed to read back same data for tick: {}, wrote: {}, read: {}"
.format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"),
.format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"),
errno)
elif errno in [0x218, 0x362]: # table doesn't exist
# do nothing
@ -2000,11 +2066,12 @@ class TaskAddData(StateTransitionTask):
else:
self.activeTable.add(i) # marking it active
dbName = db.getName()
sTable = db.getFixedSuperTable()
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
fullTableName = db.getName() + '.' + regTableName
fullTableName = dbName + '.' + regTableName
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
sTable.ensureTable(self, wt.getDbConn(), db.getName(), regTableName) # Ensure the table exists
sTable.ensureTable(self, wt.getDbConn(), regTableName) # Ensure the table exists
# self._unlockTable(fullTableName)
if Dice.throw(1) == 0: # 1 in 2 chance
@ -2024,7 +2091,7 @@ class ThreadStacks: # stack info for all threads
self._allStacks[th.native_id] = stack
def print(self, filteredEndName = None, filterInternal = False):
for thNid, stack in self._allStacks.items(): # for each thread
for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom
lastFrame = stack[-1]
if filteredEndName: # we need to filter out stacks that match this name
if lastFrame.name == filteredEndName : # end did not match
@ -2036,9 +2103,9 @@ class ThreadStacks: # stack info for all threads
'__init__']: # the thread that extracted the stack
continue # ignore
# Now print
print("\n<----- Thread Info for LWP/ID: {} (Execution stopped at Bottom Frame) <-----".format(thNid))
print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid))
stackFrame = 0
for frame in stack:
for frame in stack: # was using: reversed(stack)
# print(frame)
print("[{sf}] File {filename}, line {lineno}, in {name}".format(
sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))

View File

@ -78,7 +78,7 @@ class DbConn:
if nRows != 1:
raise taos.error.ProgrammingError(
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
(0x991 if nRows==0 else 0x992)
(CrashGenError.INVALID_EMPTY_RESULT if nRows==0 else CrashGenError.INVALID_MULTIPLE_RESULT)
)
if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
@ -349,7 +349,8 @@ class DbConnNative(DbConn):
def execute(self, sql):
if (not self.isOpen):
raise RuntimeError("Cannot execute database commands until connection is open")
raise CrashGenError(
"Cannot exec SQL unless db connection is open", CrashGenError.DB_CONNECTION_NOT_OPEN)
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.execute(sql)
@ -360,8 +361,8 @@ class DbConnNative(DbConn):
def query(self, sql): # return rows affected
if (not self.isOpen):
raise RuntimeError(
"Cannot query database until connection is open")
raise CrashGenError(
"Cannot query database until connection is open, restarting?", CrashGenError.DB_CONNECTION_NOT_OPEN)
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.query(sql)

View File

@ -3,14 +3,20 @@ import random
import logging
import os
import taos
class CrashGenError(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self.errno = errno
def __str__(self):
return self.msg
class CrashGenError(taos.error.ProgrammingError):
INVALID_EMPTY_RESULT = 0x991
INVALID_MULTIPLE_RESULT = 0x992
DB_CONNECTION_NOT_OPEN = 0x993
# def __init__(self, msg=None, errno=None):
# self.msg = msg
# self.errno = errno
# def __str__(self):
# return self.msg
pass
class LoggingFilter(logging.Filter):
@ -168,6 +174,7 @@ class Progress:
SERVICE_RECONNECT_FAILURE = 6
SERVICE_START_NAP = 7
CREATE_TABLE_ATTEMPT = 8
QUERY_GROUP_BY = 9
tokens = {
STEP_BOUNDARY: '.',
@ -178,7 +185,8 @@ class Progress:
SERVICE_RECONNECT_SUCCESS: '.r>',
SERVICE_RECONNECT_FAILURE: '.xr>',
SERVICE_START_NAP: '_zz',
CREATE_TABLE_ATTEMPT: '_c',
CREATE_TABLE_ATTEMPT: 'c',
QUERY_GROUP_BY: 'g',
}
@classmethod

View File

@ -51,10 +51,12 @@ class TdeInstance():
def prepareGcovEnv(cls, env):
# Ref: https://gcc.gnu.org/onlinedocs/gcc/Cross-profiling.html
bPath = cls._getBuildPath() # build PATH
numSegments = len(bPath.split('/')) - 1 # "/x/TDengine/build" should yield 3
numSegments = numSegments - 1 # DEBUG only
env['GCOV_PREFIX'] = bPath + '/svc_gcov'
numSegments = len(bPath.split('/')) # "/x/TDengine/build" should yield 3
# numSegments += 2 # cover "/src" after build
# numSegments = numSegments - 1 # DEBUG only
env['GCOV_PREFIX'] = bPath + '/src_s' # Server side source
env['GCOV_PREFIX_STRIP'] = str(numSegments) # Strip every element, plus, ENV needs strings
# VERY VERY important note: GCOV data collection NOT effective upon SIG_KILL
Logging.info("Preparing GCOV environement to strip {} elements and use path: {}".format(
numSegments, env['GCOV_PREFIX'] ))
@ -258,14 +260,15 @@ class TdeSubProcess:
TdeInstance.prepareGcovEnv(myEnv)
# print(myEnv)
# print(myEnv.items())
# print("Starting TDengine with env: ", myEnv.items())
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
useShell = True
self.subProcess = subprocess.Popen(
' '.join(cmdLine) if useShell else cmdLine,
shell=useShell,
# svcCmdSingle, shell=True, # capture core dump?
# ' '.join(cmdLine) if useShell else cmdLine,
# shell=useShell,
' '.join(cmdLine),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# bufsize=1, # not supported in binary mode
@ -273,7 +276,8 @@ class TdeSubProcess:
env=myEnv
) # had text=True, which interferred with reading EOF
STOP_SIGNAL = signal.SIGKILL # What signal to use (in kill) to stop a taosd process?
STOP_SIGNAL = signal.SIGKILL # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process?
SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm
def stop(self):
"""
@ -320,8 +324,12 @@ class TdeSubProcess:
retCode = self.subProcess.returncode # should always be there
# May throw subprocess.TimeoutExpired exception above, therefore
# The process is guranteed to have ended by now
self.subProcess = None
if retCode != 0: # != (- signal.SIGINT):
self.subProcess = None
if retCode == self.SIG_KILL_RETCODE:
Logging.info("TSP.stop(): sub proc KILLED, as expected")
elif retCode == (- self.STOP_SIGNAL):
Logging.info("TSP.stop(), sub process STOPPED, as expected")
elif retCode != 0: # != (- signal.SIGINT):
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(
self.STOP_SIGNAL, retCode))
else:

View File

@ -19,6 +19,7 @@ python3 ./test.py -f insert/randomNullCommit.py
python3 insert/retentionpolicy.py
python3 ./test.py -f insert/alterTableAndInsert.py
python3 ./test.py -f insert/insertIntoTwoTables.py
python3 ./test.py -f insert/before_1970.py
python3 ./test.py -f table/alter_wal0.py
python3 ./test.py -f table/column_name.py
@ -228,6 +229,7 @@ python3 ./test.py -f update/merge_commit_data2.py
python3 ./test.py -f update/merge_commit_data2_update0.py
python3 ./test.py -f update/merge_commit_last-0.py
python3 ./test.py -f update/merge_commit_last.py
python3 ./test.py -f update/bug_td2279.py
# wal
python3 ./test.py -f wal/addOldWalTest.py

View File

@ -44,22 +44,9 @@ class RestfulInsert:
if response.status_code != 200:
print(response.content)
def insertData(self, threadID):
print("thread %d started" % threadID)
tablesPerThread = int (self.numOfTables / self.numOfThreads)
for i in range(tablesPerThread):
tableID = i + threadID * tablesPerThread
start = self.ts
for j in range(int(self.recordsPerTable / self.batchSize)):
data = "insert into %s.%s%d values" % (self.dbname, self.tableNamePerfix, tableID)
values = []
for k in range(self.batchSize):
data += "(%d, %d, %d, %d)" % (start + j * self.batchSize + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100))
response = requests.post(self.url, data, headers = self.header)
if response.status_code != 200:
print(response.content)
def insertnData(self, threadID):
def insertData(self, threadID):
print("thread %d started" % threadID)
tablesPerThread = int (self.numOfTables / self.numOfThreads)
loop = int(self.recordsPerTable / self.batchSize)
@ -81,11 +68,9 @@ class RestfulInsert:
if self.outOfOrder :
random.shuffle(values)
data+=''.join(values)
response = requests.post(self.url, data, headers = self.header)
if response.status_code != 200:
print(response.content)
print('----------------',loop,time.time()-start1)
else:
for i in range(0,tablesPerThread+self.tablePerbatch,self.tablePerbatch):
for k in range(loop):
@ -101,8 +86,7 @@ class RestfulInsert:
values.append("(%d, %d, %d, %d)" % (start + k * self.batchSize + l, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)))
if self.outOfOrder :
random.shuffle(values)
data+=''.join(values)
print('------------------',len(data))
data+=''.join(values)
if len(data) > 1024*1024 :
print ('batch size is larger than 1M')
exit(-1)
@ -169,7 +153,7 @@ class RestfulInsert:
for i in range(self.numOfThreads):
threads[i].join()
print("inserting %d records takes %d seconds" % (self.numOfTables * self.recordsPerTable, (time.time() - startTime)))
print("inserting %s records takes %d seconds" % (self.numOfTables * self.recordsPerTable, (time.time() - startTime)))
parser = argparse.ArgumentParser()
parser.add_argument(
@ -204,14 +188,14 @@ parser.add_argument(
'-T',
'--number-of-tables',
action='store',
default=1000,
default=10000,
type=int,
help='Number of tables to be created (default: 1000)')
parser.add_argument(
'-r',
'--number-of-records',
action='store',
default=1000,
default=10000,
type=int,
help='Number of record to be created for each table (default: 1000, -1 for unlimited records)')
parser.add_argument(

View File

@ -0,0 +1,67 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.ts = 1606700000000
def restartTaosd(self):
tdDnodes.stop(1)
tdDnodes.startWithoutSleep(1)
tdSql.execute("use db")
def run(self):
tdSql.prepare()
print("==============step1")
tdSql.execute("create table t (ts timestamp, a int)")
for i in range(3276):
tdSql.execute("insert into t values(%d, 0)" % (self.ts + i))
newTs = 1606700010000
for i in range(3275):
tdSql.execute("insert into t values(%d, 0)" % (self.ts + i))
tdSql.execute("insert into t values(%d, 0)" % 1606700013280)
self.restartTaosd()
for i in range(1606700003275, 1606700006609):
tdSql.execute("insert into t values(%d, 0)" % i)
tdSql.execute("insert into t values(%d, 0)" % 1606700006612)
self.restartTaosd()
tdSql.execute("insert into t values(%d, 0)" % 1606700006610)
tdSql.query("select * from t")
tdSql.checkRows(6612)
tdDnodes.stop(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -24,7 +24,7 @@ sql alter dnode 1 debugFlag 135
sql alter dnode 1 debugFlag 131
sql alter dnode 1 monitor 0
sql alter dnode 1 debugFlag 135
sql alter dnode 1 monitorDebugFlag 135
sql alter dnode 1 monDebugFlag 135
sql alter dnode 1 vDebugFlag 135
sql alter dnode 1 mDebugFlag 135
sql alter dnode 1 cDebugFlag 135
@ -44,15 +44,15 @@ sql_error alter dnode 2 tmrDebugFlag 135
print ======== step3
sql_error alter $hostname1 debugFlag 135
sql_error alter $hostname1 monitorDebugFlag 135
sql_error alter $hostname1 monDebugFlag 135
sql_error alter $hostname1 vDebugFlag 135
sql_error alter $hostname1 mDebugFlag 135
sql_error alter dnode $hostname2 debugFlag 135
sql_error alter dnode $hostname2 monitorDebugFlag 135
sql_error alter dnode $hostname2 monDebugFlag 135
sql_error alter dnode $hostname2 vDebugFlag 135
sql_error alter dnode $hostname2 mDebugFlag 135
sql alter dnode $hostname1 debugFlag 135
sql alter dnode $hostname1 monitorDebugFlag 135
sql alter dnode $hostname1 monDebugFlag 135
sql alter dnode $hostname1 vDebugFlag 135
sql alter dnode $hostname1 tmrDebugFlag 131

View File

@ -120,7 +120,7 @@ echo "cDebugFlag 143" >> $TAOS_CFG
echo "jnidebugFlag 143" >> $TAOS_CFG
echo "odbcdebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 143" >> $TAOS_CFG
echo "monitorDebugFlag 143" >> $TAOS_CFG
echo "monDebugFlag 143" >> $TAOS_CFG
echo "mqttDebugFlag 143" >> $TAOS_CFG
echo "qdebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG