Merge branch 'patch/TD-1983' of https://github.com/taosdata/TDengine into patch/TD-1983
This commit is contained in:
commit
dbd975b3a0
|
@ -97,6 +97,7 @@ extern int32_t tsAlternativeRole;
|
||||||
extern int32_t tsBalanceInterval;
|
extern int32_t tsBalanceInterval;
|
||||||
extern int32_t tsOfflineThreshold;
|
extern int32_t tsOfflineThreshold;
|
||||||
extern int32_t tsMnodeEqualVnodeNum;
|
extern int32_t tsMnodeEqualVnodeNum;
|
||||||
|
extern int32_t tsFlowCtrl;
|
||||||
|
|
||||||
// restful
|
// restful
|
||||||
extern int32_t tsEnableHttpModule;
|
extern int32_t tsEnableHttpModule;
|
||||||
|
|
|
@ -133,6 +133,7 @@ int32_t tsAlternativeRole = 0;
|
||||||
int32_t tsBalanceInterval = 300; // seconds
|
int32_t tsBalanceInterval = 300; // seconds
|
||||||
int32_t tsOfflineThreshold = 86400*100; // seconds 10days
|
int32_t tsOfflineThreshold = 86400*100; // seconds 10days
|
||||||
int32_t tsMnodeEqualVnodeNum = 4;
|
int32_t tsMnodeEqualVnodeNum = 4;
|
||||||
|
int32_t tsFlowCtrl = 1;
|
||||||
|
|
||||||
// restful
|
// restful
|
||||||
int32_t tsEnableHttpModule = 1;
|
int32_t tsEnableHttpModule = 1;
|
||||||
|
@ -971,6 +972,17 @@ static void doInitGlobalConfig(void) {
|
||||||
cfg.maxValue = 1000;
|
cfg.maxValue = 1000;
|
||||||
cfg.ptrLength = 0;
|
cfg.ptrLength = 0;
|
||||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||||
|
taosInitConfigOption(cfg);
|
||||||
|
|
||||||
|
// module configs
|
||||||
|
cfg.option = "flowctrl";
|
||||||
|
cfg.ptr = &tsFlowCtrl;
|
||||||
|
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||||
|
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
||||||
|
cfg.minValue = 0;
|
||||||
|
cfg.maxValue = 1;
|
||||||
|
cfg.ptrLength = 0;
|
||||||
|
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||||
taosInitConfigOption(cfg);
|
taosInitConfigOption(cfg);
|
||||||
|
|
||||||
cfg.option = "http";
|
cfg.option = "http";
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
package com.taosdata.jdbc.cases;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.DriverManager;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
|
||||||
|
public class AppMemoryLeakTest {
|
||||||
|
|
||||||
|
@Test(expected = SQLException.class)
|
||||||
|
public void testCreateTooManyConnection() throws ClassNotFoundException, SQLException {
|
||||||
|
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||||
|
int conCnt = 0;
|
||||||
|
while (true) {
|
||||||
|
Connection conn = DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata");
|
||||||
|
System.out.println(conCnt++ + " : " + conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateTooManyStatement() throws ClassNotFoundException, SQLException {
|
||||||
|
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||||
|
int stmtCnt = 0;
|
||||||
|
Connection conn = DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata");
|
||||||
|
while (true) {
|
||||||
|
Statement stmt = conn.createStatement();
|
||||||
|
System.out.println(++stmtCnt + " : " + stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws ClassNotFoundException, SQLException {
|
||||||
|
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||||
|
int stmtCnt = 0;
|
||||||
|
Connection conn = DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata");
|
||||||
|
while (true) {
|
||||||
|
Statement stmt = conn.createStatement();
|
||||||
|
System.out.println(++stmtCnt + " : " + stmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -162,7 +162,7 @@ static void *dnodeProcessMPeerQueue(void *param) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
dDebug("msg:%s will be processed in mpeer queue", taosMsg[pPeerMsg->rpcMsg.msgType]);
|
dTrace("msg:%s will be processed in mpeer queue", taosMsg[pPeerMsg->rpcMsg.msgType]);
|
||||||
int32_t code = mnodeProcessPeerReq(pPeerMsg);
|
int32_t code = mnodeProcessPeerReq(pPeerMsg);
|
||||||
dnodeSendRpcMPeerRsp(pPeerMsg, code);
|
dnodeSendRpcMPeerRsp(pPeerMsg, code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,7 +168,7 @@ static void *dnodeProcessMReadQueue(void *param) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
dDebug("msg:%p, app:%p type:%s will be processed in mread queue", pRead->rpcMsg.ahandle, pRead,
|
dTrace("msg:%p, app:%p type:%s will be processed in mread queue", pRead->rpcMsg.ahandle, pRead,
|
||||||
taosMsg[pRead->rpcMsg.msgType]);
|
taosMsg[pRead->rpcMsg.msgType]);
|
||||||
int32_t code = mnodeProcessRead(pRead);
|
int32_t code = mnodeProcessRead(pRead);
|
||||||
dnodeSendRpcMReadRsp(pRead, code);
|
dnodeSendRpcMReadRsp(pRead, code);
|
||||||
|
|
|
@ -174,7 +174,7 @@ static void *dnodeProcessMWriteQueue(void *param) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
dDebug("msg:%p, app:%p type:%s will be processed in mwrite queue", pWrite, pWrite->rpcMsg.ahandle,
|
dTrace("msg:%p, app:%p type:%s will be processed in mwrite queue", pWrite, pWrite->rpcMsg.ahandle,
|
||||||
taosMsg[pWrite->rpcMsg.msgType]);
|
taosMsg[pWrite->rpcMsg.msgType]);
|
||||||
|
|
||||||
int32_t code = mnodeProcessWrite(pWrite);
|
int32_t code = mnodeProcessWrite(pWrite);
|
||||||
|
|
|
@ -151,9 +151,9 @@ void dnodeCleanupClient() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
||||||
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) {
|
if (dnodeGetRunStatus() == TSDB_RUN_STATUS_STOPPED) {
|
||||||
if (pMsg == NULL || pMsg->pCont == NULL) return;
|
if (pMsg == NULL || pMsg->pCont == NULL) return;
|
||||||
dDebug("msg:%p is ignored since dnode not running", pMsg);
|
dDebug("msg:%p is ignored since dnode is stopping", pMsg);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,7 +175,7 @@ static void *dnodeProcessReadQueue(void *pWorker) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
dDebug("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle,
|
dTrace("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle,
|
||||||
taosMsg[pRead->msgType], qtype);
|
taosMsg[pRead->msgType], qtype);
|
||||||
|
|
||||||
int32_t code = vnodeProcessRead(pVnode, pRead);
|
int32_t code = vnodeProcessRead(pVnode, pRead);
|
||||||
|
|
|
@ -176,7 +176,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
|
||||||
if (count <= 1) return;
|
if (count <= 1) return;
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
.handle = pWrite->rpcHandle,
|
.handle = pWrite->rpcMsg.handle,
|
||||||
.pCont = pWrite->rspRet.rsp,
|
.pCont = pWrite->rspRet.rsp,
|
||||||
.contLen = pWrite->rspRet.len,
|
.contLen = pWrite->rspRet.len,
|
||||||
.code = pWrite->code,
|
.code = pWrite->code,
|
||||||
|
@ -206,7 +206,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
|
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
|
||||||
dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite,
|
dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite,
|
||||||
pWrite->rpcAhandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version);
|
pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version);
|
||||||
|
|
||||||
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
|
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
|
||||||
if (pWrite->code <= 0) pWrite->processedCount = 1;
|
if (pWrite->code <= 0) pWrite->processedCount = 1;
|
||||||
|
|
|
@ -205,6 +205,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid ve
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_SYNCING, 0, 0x0513, "Database is syncing")
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID")
|
||||||
|
|
|
@ -23,7 +23,7 @@ extern "C" {
|
||||||
#define TAOS_SYNC_MAX_REPLICA 5
|
#define TAOS_SYNC_MAX_REPLICA 5
|
||||||
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
|
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
|
||||||
|
|
||||||
typedef enum _TAOS_SYNC_ROLE {
|
typedef enum {
|
||||||
TAOS_SYNC_ROLE_OFFLINE = 0,
|
TAOS_SYNC_ROLE_OFFLINE = 0,
|
||||||
TAOS_SYNC_ROLE_UNSYNCED = 1,
|
TAOS_SYNC_ROLE_UNSYNCED = 1,
|
||||||
TAOS_SYNC_ROLE_SYNCING = 2,
|
TAOS_SYNC_ROLE_SYNCING = 2,
|
||||||
|
@ -31,7 +31,7 @@ typedef enum _TAOS_SYNC_ROLE {
|
||||||
TAOS_SYNC_ROLE_MASTER = 4
|
TAOS_SYNC_ROLE_MASTER = 4
|
||||||
} ESyncRole;
|
} ESyncRole;
|
||||||
|
|
||||||
typedef enum _TAOS_SYNC_STATUS {
|
typedef enum {
|
||||||
TAOS_SYNC_STATUS_INIT = 0,
|
TAOS_SYNC_STATUS_INIT = 0,
|
||||||
TAOS_SYNC_STATUS_START = 1,
|
TAOS_SYNC_STATUS_START = 1,
|
||||||
TAOS_SYNC_STATUS_FILE = 2,
|
TAOS_SYNC_STATUS_FILE = 2,
|
||||||
|
@ -80,7 +80,7 @@ typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code);
|
||||||
typedef void (*FNotifyRole)(int32_t vgId, int8_t role);
|
typedef void (*FNotifyRole)(int32_t vgId, int8_t role);
|
||||||
|
|
||||||
// if a number of retrieving data failed, call this to start flow control
|
// if a number of retrieving data failed, call this to start flow control
|
||||||
typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t mseconds);
|
typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
|
||||||
|
|
||||||
// when data file is synced successfully, notity app
|
// when data file is synced successfully, notity app
|
||||||
typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
|
typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "trpc.h"
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
|
|
||||||
typedef enum _VN_STATUS {
|
typedef enum _VN_STATUS {
|
||||||
|
@ -51,8 +52,9 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t processedCount;
|
int32_t processedCount;
|
||||||
void * rpcHandle;
|
int32_t qtype;
|
||||||
void * rpcAhandle;
|
void * pVnode;
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
SRspRet rspRet;
|
SRspRet rspRet;
|
||||||
char reserveForSync[16];
|
char reserveForSync[16];
|
||||||
SWalHead pHead[];
|
SWalHead pHead[];
|
||||||
|
|
|
@ -77,6 +77,7 @@ extern "C" {
|
||||||
#include <sys/utsname.h>
|
#include <sys/utsname.h>
|
||||||
#include <sys/resource.h>
|
#include <sys/resource.h>
|
||||||
#include <linux/sysctl.h>
|
#include <linux/sysctl.h>
|
||||||
|
#include <math.h>
|
||||||
|
|
||||||
typedef int(*__compar_fn_t)(const void *, const void *);
|
typedef int(*__compar_fn_t)(const void *, const void *);
|
||||||
void error (int, int, const char *);
|
void error (int, int, const char *);
|
||||||
|
|
|
@ -76,6 +76,7 @@ extern "C" {
|
||||||
#include <sys/utsname.h>
|
#include <sys/utsname.h>
|
||||||
#include <sys/resource.h>
|
#include <sys/resource.h>
|
||||||
#include <error.h>
|
#include <error.h>
|
||||||
|
#include <math.h>
|
||||||
|
|
||||||
#define TAOS_OS_FUNC_LZ4
|
#define TAOS_OS_FUNC_LZ4
|
||||||
#define BUILDIN_CLZL(val) __builtin_clzll(val)
|
#define BUILDIN_CLZL(val) __builtin_clzll(val)
|
||||||
|
|
|
@ -77,6 +77,7 @@ extern "C" {
|
||||||
#include <sys/resource.h>
|
#include <sys/resource.h>
|
||||||
#include <error.h>
|
#include <error.h>
|
||||||
#include <linux/sysctl.h>
|
#include <linux/sysctl.h>
|
||||||
|
#include <math.h>
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,7 @@ extern "C" {
|
||||||
#include <dispatch/dispatch.h>
|
#include <dispatch/dispatch.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <sys/utsname.h>
|
#include <sys/utsname.h>
|
||||||
|
#include <math.h>
|
||||||
|
|
||||||
#define TAOS_OS_FUNC_FILE_SENDIFLE
|
#define TAOS_OS_FUNC_FILE_SENDIFLE
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,7 @@ extern "C" {
|
||||||
#include <sys/utsname.h>
|
#include <sys/utsname.h>
|
||||||
#include <sys/resource.h>
|
#include <sys/resource.h>
|
||||||
#include <error.h>
|
#include <error.h>
|
||||||
|
#include <math.h>
|
||||||
|
|
||||||
#define TAOS_OS_FUNC_LZ4
|
#define TAOS_OS_FUNC_LZ4
|
||||||
#define BUILDIN_CLZL(val) __builtin_clzll(val)
|
#define BUILDIN_CLZL(val) __builtin_clzll(val)
|
||||||
|
|
|
@ -79,6 +79,7 @@ extern "C" {
|
||||||
#include <error.h>
|
#include <error.h>
|
||||||
#endif
|
#endif
|
||||||
#include <linux/sysctl.h>
|
#include <linux/sysctl.h>
|
||||||
|
#include <math.h>
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include <inttypes.h>
|
#include <inttypes.h>
|
||||||
#include <conio.h>
|
#include <conio.h>
|
||||||
|
#include <math.h>
|
||||||
#include "msvcProcess.h"
|
#include "msvcProcess.h"
|
||||||
#include "msvcDirect.h"
|
#include "msvcDirect.h"
|
||||||
#include "msvcFcntl.h"
|
#include "msvcFcntl.h"
|
||||||
|
|
|
@ -504,6 +504,8 @@ void *syncRetrieveData(void *param) {
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
taosBlockSIGPIPE();
|
taosBlockSIGPIPE();
|
||||||
|
|
||||||
|
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);
|
||||||
|
|
||||||
pPeer->fileChanged = 0;
|
pPeer->fileChanged = 0;
|
||||||
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
|
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
|
||||||
if (pPeer->syncFd < 0) {
|
if (pPeer->syncFd < 0) {
|
||||||
|
@ -520,10 +522,7 @@ void *syncRetrieveData(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pPeer->fileChanged) {
|
if (pPeer->fileChanged) {
|
||||||
// if file is changed 3 times continuously, start flow control
|
|
||||||
pPeer->numOfRetrieves++;
|
pPeer->numOfRetrieves++;
|
||||||
if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl)
|
|
||||||
(*pNode->notifyFlowCtrl)(pNode->vgId, 4 << (pPeer->numOfRetrieves - 2));
|
|
||||||
} else {
|
} else {
|
||||||
pPeer->numOfRetrieves = 0;
|
pPeer->numOfRetrieves = 0;
|
||||||
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
|
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
|
||||||
|
|
|
@ -871,10 +871,10 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
||||||
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
||||||
|
|
||||||
if (pBufBlock->offset == 0) { // return the block to buffer pool
|
if (pBufBlock->offset == 0) { // return the block to buffer pool
|
||||||
tsdbLockRepo(pRepo);
|
if (tsdbLockRepo(pRepo) < 0) return;
|
||||||
SListNode *pNode = tdListPopTail(pRepo->mem->bufBlockList);
|
SListNode *pNode = tdListPopTail(pRepo->mem->bufBlockList);
|
||||||
tdListPrependNode(pBufPool->bufBlockList, pNode);
|
tdListPrependNode(pBufPool->bufBlockList, pNode);
|
||||||
tsdbUnlockRepo(pRepo);
|
if (tsdbUnlockRepo(pRepo) < 0) return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(listNEles(pRepo->mem->extraBuffList) > 0);
|
ASSERT(listNEles(pRepo->mem->extraBuffList) > 0);
|
||||||
|
|
|
@ -291,7 +291,10 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
|
||||||
iter->next = SL_NODE_GET_FORWARD_POINTER(iter->cur, 0);
|
iter->next = SL_NODE_GET_FORWARD_POINTER(iter->cur, 0);
|
||||||
iter->step++;
|
iter->step++;
|
||||||
} else {
|
} else {
|
||||||
if (iter->cur == pSkipList->pHead) return false;
|
if (iter->cur == pSkipList->pHead) {
|
||||||
|
tSkipListUnlock(pSkipList);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
iter->cur = SL_NODE_GET_BACKWARD_POINTER(iter->cur, 0);
|
iter->cur = SL_NODE_GET_BACKWARD_POINTER(iter->cur, 0);
|
||||||
|
|
||||||
// a new node is inserted into between iter->cur and iter->next, ignore it
|
// a new node is inserted into between iter->cur and iter->next, ignore it
|
||||||
|
|
|
@ -39,7 +39,7 @@ typedef struct {
|
||||||
int32_t refCount; // reference count
|
int32_t refCount; // reference count
|
||||||
int32_t queuedWMsg;
|
int32_t queuedWMsg;
|
||||||
int32_t queuedRMsg;
|
int32_t queuedRMsg;
|
||||||
int32_t delayMs;
|
int32_t flowctrlLevel;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
int8_t role;
|
int8_t role;
|
||||||
int8_t accessState;
|
int8_t accessState;
|
||||||
|
|
|
@ -34,7 +34,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
|
||||||
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
|
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
|
||||||
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
|
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
|
||||||
static void vnodeNotifyRole(int32_t vgId, int8_t role);
|
static void vnodeNotifyRole(int32_t vgId, int8_t role);
|
||||||
static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds);
|
static void vnodeCtrlFlow(int32_t vgId, int32_t level);
|
||||||
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
|
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
|
||||||
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
|
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
|
||||||
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
|
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
|
||||||
|
@ -659,7 +659,7 @@ static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
|
||||||
static void vnodeNotifyRole(int32_t vgId, int8_t role) {
|
static void vnodeNotifyRole(int32_t vgId, int8_t role) {
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
vError("vgId:%d, vnode not found while notify role", vgId);
|
vTrace("vgId:%d, vnode not found while notify role", vgId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -676,17 +676,15 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) {
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds) {
|
static void vnodeCtrlFlow(int32_t vgId, int32_t level) {
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
vError("vgId:%d, vnode not found while ctrl flow", vgId);
|
vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->delayMs != mseconds) {
|
pVnode->flowctrlLevel = level;
|
||||||
pVnode->delayMs = mseconds;
|
vDebug("vgId:%d, set flowctrl level:%d", pVnode->vgId, level);
|
||||||
vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,19 +17,23 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
#include "tglobal.h"
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
#include "tsync.h"
|
#include "tsync.h"
|
||||||
|
#include "ttimer.h"
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
#include "tcq.h"
|
#include "tcq.h"
|
||||||
|
#include "dnode.h"
|
||||||
|
|
||||||
#define MAX_QUEUED_MSG_NUM 10000
|
#define MAX_QUEUED_MSG_NUM 10000
|
||||||
|
|
||||||
|
extern void * tsDnodeTmr;
|
||||||
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
|
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
|
||||||
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
|
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
|
||||||
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
|
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
|
||||||
|
@ -37,6 +41,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
|
||||||
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
|
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
|
||||||
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
|
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
|
||||||
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
|
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
|
||||||
|
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite);
|
||||||
|
|
||||||
void vnodeInitWriteFp(void) {
|
void vnodeInitWriteFp(void) {
|
||||||
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
|
||||||
|
@ -77,8 +82,6 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
|
||||||
|
|
||||||
// assign version
|
// assign version
|
||||||
pHead->version = pVnode->version + 1;
|
pHead->version = pVnode->version + 1;
|
||||||
if (pVnode->delayMs) taosMsleep(pVnode->delayMs);
|
|
||||||
|
|
||||||
} else { // from wal or forward
|
} else { // from wal or forward
|
||||||
// for data from WAL or forward, version may be smaller
|
// for data from WAL or forward, version may be smaller
|
||||||
if (pHead->version <= pVnode->version) return 0;
|
if (pHead->version <= pVnode->version) return 0;
|
||||||
|
@ -218,9 +221,10 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
|
||||||
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
|
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
|
||||||
SVnodeObj *pVnode = vparam;
|
SVnodeObj *pVnode = vparam;
|
||||||
SWalHead * pHead = wparam;
|
SWalHead * pHead = wparam;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
if (qtype == TAOS_QTYPE_RPC) {
|
if (qtype == TAOS_QTYPE_RPC) {
|
||||||
int32_t code = vnodeCheckWrite(pVnode);
|
code = vnodeCheckWrite(pVnode);
|
||||||
if (code != TSDB_CODE_SUCCESS) return code;
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,11 +241,12 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
|
||||||
|
|
||||||
if (rparam != NULL) {
|
if (rparam != NULL) {
|
||||||
SRpcMsg *pRpcMsg = rparam;
|
SRpcMsg *pRpcMsg = rparam;
|
||||||
pWrite->rpcHandle = pRpcMsg->handle;
|
pWrite->rpcMsg = *pRpcMsg;
|
||||||
pWrite->rpcAhandle = pRpcMsg->ahandle;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
|
memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
|
||||||
|
pWrite->pVnode = pVnode;
|
||||||
|
pWrite->qtype = qtype;
|
||||||
|
|
||||||
atomic_add_fetch_32(&pVnode->refCount, 1);
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
|
|
||||||
|
@ -251,6 +256,9 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
|
||||||
taosMsleep(1);
|
taosMsleep(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = vnodePerformFlowCtrl(pWrite);
|
||||||
|
if (code != 0) return 0;
|
||||||
|
|
||||||
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
|
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
|
||||||
|
|
||||||
taosWriteQitem(pVnode->wqueue, qtype, pWrite);
|
taosWriteQitem(pVnode->wqueue, qtype, pWrite);
|
||||||
|
@ -260,9 +268,50 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
|
||||||
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
|
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
|
||||||
SVnodeObj *pVnode = vparam;
|
SVnodeObj *pVnode = vparam;
|
||||||
|
|
||||||
atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
|
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
|
||||||
vTrace("vgId:%d, free from vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
|
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued);
|
||||||
|
|
||||||
taosFreeQitem(pWrite);
|
taosFreeQitem(pWrite);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
|
||||||
|
SVWriteMsg *pWrite = param;
|
||||||
|
SVnodeObj * pVnode = pWrite->pVnode;
|
||||||
|
int32_t code = TSDB_CODE_VND_SYNCING;
|
||||||
|
|
||||||
|
pWrite->processedCount++;
|
||||||
|
if (pWrite->processedCount > 100) {
|
||||||
|
vError("vgId:%d, msg:%p, failed to process since %s", pVnode->vgId, pWrite, tstrerror(code));
|
||||||
|
pWrite->processedCount = 1;
|
||||||
|
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
|
||||||
|
} else {
|
||||||
|
code = vnodePerformFlowCtrl(pWrite);
|
||||||
|
if (code == 0) {
|
||||||
|
vTrace("vgId:%d, write into vwqueue after flowctrl", pVnode->vgId);
|
||||||
|
pWrite->processedCount = 0;
|
||||||
|
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
|
||||||
|
SVnodeObj *pVnode = pWrite->pVnode;
|
||||||
|
if (pVnode->flowctrlLevel <= 0) return 0;
|
||||||
|
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
|
||||||
|
|
||||||
|
if (tsFlowCtrl == 0) {
|
||||||
|
int32_t ms = pow(2, pVnode->flowctrlLevel + 2);
|
||||||
|
if (ms > 100) ms = 100;
|
||||||
|
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, ms);
|
||||||
|
taosMsleep(ms);
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
void *unUsed = NULL;
|
||||||
|
taosTmrReset(vnodeFlowCtrlMsgToWQueue, 100, pWrite, tsDnodeTmr, &unUsed);
|
||||||
|
|
||||||
|
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl, count:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle,
|
||||||
|
pWrite->processedCount);
|
||||||
|
return TSDB_CODE_VND_ACTION_IN_PROGRESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -146,7 +146,7 @@ class ConcurrentInquiry:
|
||||||
col_list=self.stb_stru_list[tbi-1]
|
col_list=self.stb_stru_list[tbi-1]
|
||||||
tag_list=self.stb_tag_list[tbi-1]
|
tag_list=self.stb_tag_list[tbi-1]
|
||||||
is_stb=1
|
is_stb=1
|
||||||
tlist=col_list+tag_list
|
tlist=col_list+tag_list+['abc'] #增加不存在的域'abc',是否会引起新bug
|
||||||
con_rand=random.randint(0,len(condition_list))
|
con_rand=random.randint(0,len(condition_list))
|
||||||
func_rand=random.randint(0,len(func_list))
|
func_rand=random.randint(0,len(func_list))
|
||||||
col_rand=random.randint(0,len(col_list))
|
col_rand=random.randint(0,len(col_list))
|
||||||
|
|
|
@ -18,10 +18,10 @@ import time
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
class RestfulInsert:
|
class RestfulInsert:
|
||||||
def __init__(self, host, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder):
|
def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder):
|
||||||
self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
|
self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
|
||||||
self.url = "http://%s:6041/rest/sql" % host
|
self.url = "http://%s:6041/rest/sql" % host
|
||||||
self.ts = 1500000000000
|
self.ts = startTimestamp
|
||||||
self.dbname = dbname
|
self.dbname = dbname
|
||||||
self.numOfThreads = threads
|
self.numOfThreads = threads
|
||||||
self.numOfTables = tables
|
self.numOfTables = tables
|
||||||
|
@ -36,8 +36,10 @@ class RestfulInsert:
|
||||||
for i in range(tablesPerThread):
|
for i in range(tablesPerThread):
|
||||||
tableID = threadID * tablesPerThread
|
tableID = threadID * tablesPerThread
|
||||||
name = 'beijing' if tableID % 2 == 0 else 'shanghai'
|
name = 'beijing' if tableID % 2 == 0 else 'shanghai'
|
||||||
data = "create table %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name)
|
data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name)
|
||||||
requests.post(self.url, data, headers = self.header)
|
response = requests.post(self.url, data, headers = self.header)
|
||||||
|
if response.status_code != 200:
|
||||||
|
print(response.content)
|
||||||
|
|
||||||
def insertData(self, threadID):
|
def insertData(self, threadID):
|
||||||
print("thread %d started" % threadID)
|
print("thread %d started" % threadID)
|
||||||
|
@ -50,7 +52,9 @@ class RestfulInsert:
|
||||||
values = []
|
values = []
|
||||||
for k in range(self.batchSize):
|
for k in range(self.batchSize):
|
||||||
data += "(%d, %d, %d, %d)" % (start + j * self.batchSize + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100))
|
data += "(%d, %d, %d, %d)" % (start + j * self.batchSize + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100))
|
||||||
requests.post(self.url, data, headers = self.header)
|
response = requests.post(self.url, data, headers = self.header)
|
||||||
|
if response.status_code != 200:
|
||||||
|
print(response.content)
|
||||||
|
|
||||||
def insertUnlimitedData(self, threadID):
|
def insertUnlimitedData(self, threadID):
|
||||||
print("thread %d started" % threadID)
|
print("thread %d started" % threadID)
|
||||||
|
@ -77,14 +81,14 @@ class RestfulInsert:
|
||||||
random.shuffle(values)
|
random.shuffle(values)
|
||||||
for k in range(len(values)):
|
for k in range(len(values)):
|
||||||
data += values[k]
|
data += values[k]
|
||||||
requests.post(self.url, data, headers = self.header)
|
response = requests.post(self.url, data, headers = self.header)
|
||||||
|
if response.status_code != 200:
|
||||||
|
print(response.content)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
data = "drop database if exists %s" % self.dbname
|
data = "create database if not exists %s" % self.dbname
|
||||||
requests.post(self.url, data, headers = self.header)
|
requests.post(self.url, data, headers = self.header)
|
||||||
data = "create database %s" % self.dbname
|
data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname
|
||||||
requests.post(self.url, data, headers = self.header)
|
|
||||||
data = "create table %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname
|
|
||||||
requests.post(self.url, data, headers = self.header)
|
requests.post(self.url, data, headers = self.header)
|
||||||
|
|
||||||
threads = []
|
threads = []
|
||||||
|
@ -120,6 +124,13 @@ parser.add_argument(
|
||||||
default='127.0.0.1',
|
default='127.0.0.1',
|
||||||
type=str,
|
type=str,
|
||||||
help='host name to be connected (default: 127.0.0.1)')
|
help='host name to be connected (default: 127.0.0.1)')
|
||||||
|
parser.add_argument(
|
||||||
|
'-S',
|
||||||
|
'--start-timestamp',
|
||||||
|
action='store',
|
||||||
|
default=1500000000000,
|
||||||
|
type=int,
|
||||||
|
help='insert data from timestamp (default: 1500000000000)')
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'-d',
|
'-d',
|
||||||
'--db-name',
|
'--db-name',
|
||||||
|
@ -169,5 +180,5 @@ parser.add_argument(
|
||||||
help='The order of test data (default: False)')
|
help='The order of test data (default: False)')
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
ri = RestfulInsert(args.host_name, args.db_name, args.number_of_threads, args.number_of_tables, args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order)
|
ri = RestfulInsert(args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables, args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order)
|
||||||
ri.run()
|
ri.run()
|
|
@ -0,0 +1,131 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/deploy.sh -n dnode2 -i 2
|
||||||
|
system sh/deploy.sh -n dnode3 -i 3
|
||||||
|
|
||||||
|
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3
|
||||||
|
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3
|
||||||
|
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3
|
||||||
|
|
||||||
|
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
|
||||||
|
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
|
||||||
|
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
|
||||||
|
|
||||||
|
system sh/cfg.sh -n dnode1 -c http -v 0
|
||||||
|
system sh/cfg.sh -n dnode2 -c http -v 0
|
||||||
|
system sh/cfg.sh -n dnode3 -c http -v 0
|
||||||
|
|
||||||
|
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000
|
||||||
|
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000
|
||||||
|
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000
|
||||||
|
|
||||||
|
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 20
|
||||||
|
system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 20
|
||||||
|
system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 20
|
||||||
|
|
||||||
|
system sh/cfg.sh -n dnode1 -c replica -v 3
|
||||||
|
system sh/cfg.sh -n dnode2 -c replica -v 3
|
||||||
|
system sh/cfg.sh -n dnode3 -c replica -v 3
|
||||||
|
|
||||||
|
print ============== deploy
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 5001
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql create dnode $hostname2
|
||||||
|
sql create dnode $hostname3
|
||||||
|
system sh/exec.sh -n dnode2 -s start
|
||||||
|
system sh/exec.sh -n dnode3 -s start
|
||||||
|
|
||||||
|
print =============== step1
|
||||||
|
$x = 0
|
||||||
|
show1:
|
||||||
|
$x = $x + 1
|
||||||
|
sleep 2000
|
||||||
|
if $x == 5 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql show mnodes -x show1
|
||||||
|
$mnode1Role = $data2_1
|
||||||
|
print mnode1Role $mnode1Role
|
||||||
|
$mnode2Role = $data2_2
|
||||||
|
print mnode2Role $mnode2Role
|
||||||
|
$mnode3Role = $data2_3
|
||||||
|
print mnode3Role $mnode3Role
|
||||||
|
|
||||||
|
if $mnode1Role != master then
|
||||||
|
goto show1
|
||||||
|
endi
|
||||||
|
if $mnode2Role != slave then
|
||||||
|
goto show1
|
||||||
|
endi
|
||||||
|
if $mnode3Role != slave then
|
||||||
|
goto show1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== step2
|
||||||
|
|
||||||
|
sql create database db replica 3
|
||||||
|
sql use db
|
||||||
|
sql create table tb (ts timestamp, test int)
|
||||||
|
|
||||||
|
$x = 0
|
||||||
|
while $x < 100
|
||||||
|
$ms = $x . s
|
||||||
|
sql insert into tb values (now + $ms , $x )
|
||||||
|
$x = $x + 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
print =============== step3
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||||
|
|
||||||
|
print =============== step4
|
||||||
|
sleep 5000
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
system sh/exec.sh -n dnode2 -s start
|
||||||
|
system sh/exec.sh -n dnode3 -s start
|
||||||
|
|
||||||
|
print =============== step5
|
||||||
|
sleep 8000
|
||||||
|
while $x < 200
|
||||||
|
$ms = $x . s
|
||||||
|
sql insert into tb values (now + $ms , $x )
|
||||||
|
$x = $x + 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
print =============== step6
|
||||||
|
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||||
|
sleep 3000
|
||||||
|
while $x < 300
|
||||||
|
$ms = $x . s
|
||||||
|
sql insert into tb values (now + $ms , $x )
|
||||||
|
$x = $x + 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode2 -s start
|
||||||
|
|
||||||
|
sleep 6000
|
||||||
|
print =============== step7
|
||||||
|
while $x < 400
|
||||||
|
$ms = $x . s
|
||||||
|
sql insert into tb values (now + $ms , $x )
|
||||||
|
$x = $x + 1
|
||||||
|
sleep 1
|
||||||
|
endw
|
||||||
|
|
||||||
|
print =============== step8
|
||||||
|
sql select * from tb
|
||||||
|
print rows $rows
|
||||||
|
if $rows != 400 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
|
@ -17,9 +17,9 @@ function runSimCaseOneByOne {
|
||||||
echo -e "${GREEN}$case success${NC}" | tee -a out.log || \
|
echo -e "${GREEN}$case success${NC}" | tee -a out.log || \
|
||||||
echo -e "${RED}$case failed${NC}" | tee -a out.log
|
echo -e "${RED}$case failed${NC}" | tee -a out.log
|
||||||
out_log=`tail -1 out.log `
|
out_log=`tail -1 out.log `
|
||||||
if [[ $out_log =~ 'failed' ]];then
|
# if [[ $out_log =~ 'failed' ]];then
|
||||||
exit 8
|
# exit 8
|
||||||
fi
|
# fi
|
||||||
end_time=`date +%s`
|
end_time=`date +%s`
|
||||||
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a out.log
|
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a out.log
|
||||||
fi
|
fi
|
||||||
|
@ -42,9 +42,9 @@ function runPyCaseOneByOne {
|
||||||
echo -e "${RED}$case failed${NC}" | tee -a pytest-out.log
|
echo -e "${RED}$case failed${NC}" | tee -a pytest-out.log
|
||||||
end_time=`date +%s`
|
end_time=`date +%s`
|
||||||
out_log=`tail -1 pytest-out.log `
|
out_log=`tail -1 pytest-out.log `
|
||||||
if [[ $out_log =~ 'failed' ]];then
|
# if [[ $out_log =~ 'failed' ]];then
|
||||||
exit 8
|
# exit 8
|
||||||
fi
|
# fi
|
||||||
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a pytest-out.log
|
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a pytest-out.log
|
||||||
else
|
else
|
||||||
$line > /dev/null 2>&1
|
$line > /dev/null 2>&1
|
||||||
|
|
|
@ -31,8 +31,8 @@ IF (TD_LINUX)
|
||||||
#add_executable(createTablePerformance createTablePerformance.c)
|
#add_executable(createTablePerformance createTablePerformance.c)
|
||||||
#target_link_libraries(createTablePerformance taos_static tutil common pthread)
|
#target_link_libraries(createTablePerformance taos_static tutil common pthread)
|
||||||
|
|
||||||
#add_executable(createNormalTable createNormalTable.c)
|
add_executable(createNormalTable createNormalTable.c)
|
||||||
#target_link_libraries(createNormalTable taos_static tutil common pthread)
|
target_link_libraries(createNormalTable taos_static tutil common pthread)
|
||||||
|
|
||||||
#add_executable(queryPerformance queryPerformance.c)
|
#add_executable(queryPerformance queryPerformance.c)
|
||||||
#target_link_libraries(queryPerformance taos_static tutil common pthread)
|
#target_link_libraries(queryPerformance taos_static tutil common pthread)
|
||||||
|
@ -45,5 +45,8 @@ IF (TD_LINUX)
|
||||||
|
|
||||||
#add_executable(invalidTableId invalidTableId.c)
|
#add_executable(invalidTableId invalidTableId.c)
|
||||||
#target_link_libraries(invalidTableId taos_static tutil common pthread)
|
#target_link_libraries(invalidTableId taos_static tutil common pthread)
|
||||||
|
|
||||||
|
add_executable(hashIterator hashIterator.c)
|
||||||
|
target_link_libraries(hashIterator taos_static tutil common pthread)
|
||||||
ENDIF()
|
ENDIF()
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "os.h"
|
||||||
|
#include "taos.h"
|
||||||
|
#include "tulog.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "hash.h"
|
||||||
|
|
||||||
|
typedef struct HashTestRow {
|
||||||
|
int32_t keySize;
|
||||||
|
char key[100];
|
||||||
|
} HashTestRow;
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
_hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
void * hashHandle = taosHashInit(100, hashFp, true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
|
pPrint("insert 3 rows to hash");
|
||||||
|
for (int32_t t = 0; t < 3; ++t) {
|
||||||
|
HashTestRow row = {0};
|
||||||
|
row.keySize = sprintf(row.key, "0.db.st%d", t);
|
||||||
|
|
||||||
|
taosHashPut(hashHandle, row.key, row.keySize, &row, sizeof(HashTestRow));
|
||||||
|
}
|
||||||
|
|
||||||
|
pPrint("start iterator");
|
||||||
|
HashTestRow *row = taosHashIterate(hashHandle, NULL);
|
||||||
|
while (row) {
|
||||||
|
pPrint("drop key:%s", row->key);
|
||||||
|
taosHashRemove(hashHandle, row->key, row->keySize);
|
||||||
|
|
||||||
|
pPrint("get rows from hash");
|
||||||
|
for (int32_t t = 0; t < 3; ++t) {
|
||||||
|
HashTestRow r = {0};
|
||||||
|
r.keySize = sprintf(r.key, "0.db.st%d", t);
|
||||||
|
|
||||||
|
void *result = taosHashGet(hashHandle, r.key, r.keySize);
|
||||||
|
pPrint("get key:%s result:%p", r.key, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Before getting the next iterator, the object just deleted can be obtained
|
||||||
|
row = taosHashIterate(hashHandle, row);
|
||||||
|
}
|
||||||
|
|
||||||
|
pPrint("stop iterator");
|
||||||
|
taosHashCancelIterate(hashHandle, row);
|
||||||
|
|
||||||
|
pPrint("get rows from hash");
|
||||||
|
for (int32_t t = 0; t < 3; ++t) {
|
||||||
|
HashTestRow r = {0};
|
||||||
|
r.keySize = sprintf(r.key, "0.db.st%d", t);
|
||||||
|
|
||||||
|
void *result = taosHashGet(hashHandle, r.key, r.keySize);
|
||||||
|
pPrint("get key:%s result:%p", r.key, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue