Merge branch '3.0' into feature/TD-11274-3.0
This commit is contained in:
commit
23a0318604
30
Jenkinsfile2
30
Jenkinsfile2
|
@ -127,6 +127,25 @@ def pre_test(){
|
|||
'''
|
||||
return 1
|
||||
}
|
||||
def pre_test_build_mac() {
|
||||
sh '''
|
||||
hostname
|
||||
date
|
||||
'''
|
||||
sh '''
|
||||
cd ${WK}
|
||||
rm -rf debug
|
||||
mkdir debug
|
||||
'''
|
||||
sh '''
|
||||
cd ${WK}/debug
|
||||
cmake ..
|
||||
make -j8
|
||||
'''
|
||||
sh '''
|
||||
date
|
||||
'''
|
||||
}
|
||||
def pre_test_win(){
|
||||
bat '''
|
||||
hostname
|
||||
|
@ -334,6 +353,17 @@ pipeline {
|
|||
}
|
||||
}
|
||||
}
|
||||
stage('mac test') {
|
||||
agent{label " Mac_catalina "}
|
||||
steps {
|
||||
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
|
||||
timeout(time: 20, unit: 'MINUTES'){
|
||||
pre_test()
|
||||
pre_test_build_mac()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
stage('linux test') {
|
||||
agent{label " worker03 || slave215 || slave217 || slave219 "}
|
||||
options { skipDefaultCheckout() }
|
||||
|
|
|
@ -87,8 +87,8 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
|
|||
static void dmCleanupProcQueue(SProcQueue *queue) {}
|
||||
|
||||
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
|
||||
const void *pHead = pMsg;
|
||||
const void *pBody = pMsg->pCont;
|
||||
const void * pHead = pMsg;
|
||||
const void * pBody = pMsg->pCont;
|
||||
const int16_t rawHeadLen = sizeof(SRpcMsg);
|
||||
const int32_t rawBodyLen = pMsg->contLen;
|
||||
const int16_t headLen = CEIL8(rawHeadLen);
|
||||
|
@ -257,7 +257,7 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
|
|||
proc->wrapper = pWrapper;
|
||||
proc->name = pWrapper->name;
|
||||
|
||||
SShm *shm = &proc->shm;
|
||||
SShm * shm = &proc->shm;
|
||||
int32_t cstart = 0;
|
||||
int32_t csize = CEIL8(shm->size / 2);
|
||||
int32_t pstart = csize;
|
||||
|
@ -281,13 +281,13 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper) {
|
|||
}
|
||||
|
||||
static void *dmConsumChildQueue(void *param) {
|
||||
SProc *proc = param;
|
||||
SProc * proc = param;
|
||||
SMgmtWrapper *pWrapper = proc->wrapper;
|
||||
SProcQueue *queue = proc->cqueue;
|
||||
SProcQueue * queue = proc->cqueue;
|
||||
int32_t numOfMsgs = 0;
|
||||
int32_t code = 0;
|
||||
EProcFuncType ftype = DND_FUNC_REQ;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
SRpcMsg * pMsg = NULL;
|
||||
|
||||
dDebug("node:%s, start to consume from cqueue", proc->name);
|
||||
do {
|
||||
|
@ -324,13 +324,13 @@ static void *dmConsumChildQueue(void *param) {
|
|||
}
|
||||
|
||||
static void *dmConsumParentQueue(void *param) {
|
||||
SProc *proc = param;
|
||||
SProc * proc = param;
|
||||
SMgmtWrapper *pWrapper = proc->wrapper;
|
||||
SProcQueue *queue = proc->pqueue;
|
||||
SProcQueue * queue = proc->pqueue;
|
||||
int32_t numOfMsgs = 0;
|
||||
int32_t code = 0;
|
||||
EProcFuncType ftype = DND_FUNC_REQ;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
SRpcMsg * pMsg = NULL;
|
||||
|
||||
dDebug("node:%s, start to consume from pqueue", proc->name);
|
||||
do {
|
||||
|
@ -353,7 +353,7 @@ static void *dmConsumParentQueue(void *param) {
|
|||
rpcRegisterBrokenLinkArg(pMsg);
|
||||
} else if (ftype == DND_FUNC_RELEASE) {
|
||||
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
|
||||
rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code);
|
||||
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER);
|
||||
} else {
|
||||
dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
|
|
@ -245,7 +245,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
|||
SRpcMsg msg = {.code = type, .info = *pHandle};
|
||||
dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
|
||||
} else {
|
||||
rpcReleaseHandle(pHandle->handle, type);
|
||||
rpcReleaseHandle(pHandle, type);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -262,13 +262,17 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1)
|
||||
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
||||
|
||||
#define EPSET_IS_VALID(epSet) ((epSet) != NULL && (epSet)->numOfEps != 0)
|
||||
#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps
|
||||
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
|
||||
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
|
||||
#define EPSET_FORWARD_INUSE(epSet) \
|
||||
do { \
|
||||
(epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \
|
||||
#define EPSET_FORWARD_INUSE(epSet) \
|
||||
do { \
|
||||
if ((epSet)->numOfEps != 0) { \
|
||||
(epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define EPSET_DEBUG_STR(epSet, tbuf) \
|
||||
do { \
|
||||
int len = snprintf(tbuf, sizeof(tbuf), "epset:{"); \
|
||||
|
@ -512,7 +516,6 @@ static void allocConnRef(SCliConn* conn, bool update) {
|
|||
}
|
||||
static void addConnToPool(void* pool, SCliConn* conn) {
|
||||
if (conn->status == ConnInPool) {
|
||||
// assert(0);
|
||||
return;
|
||||
}
|
||||
SCliThrd* thrd = conn->hostThrd;
|
||||
|
@ -668,7 +671,6 @@ static void cliSendCb(uv_write_t* req, int status) {
|
|||
void cliSend(SCliConn* pConn) {
|
||||
CONN_HANDLE_BROKEN(pConn);
|
||||
|
||||
// assert(taosArrayGetSize(pConn->cliMsgs) > 0);
|
||||
assert(!transQueueEmpty(&pConn->cliMsgs));
|
||||
|
||||
SCliMsg* pCliMsg = NULL;
|
||||
|
@ -810,6 +812,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
|
||||
if (!EPSET_IS_VALID(&pCtx->epSet)) {
|
||||
destroyCmsg(pMsg);
|
||||
tError("invalid epset");
|
||||
return;
|
||||
}
|
||||
|
||||
bool ignore = false;
|
||||
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
|
||||
|
@ -1077,12 +1084,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
|||
} else {
|
||||
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
|
||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||
addConnToPool(pThrd->pool, pConn);
|
||||
if (pResp->contLen == 0) {
|
||||
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
||||
} else {
|
||||
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet);
|
||||
if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet) < 0) {
|
||||
tError("%s conn %p failed to deserialize epset", CONN_GET_INST_LABEL(pConn));
|
||||
}
|
||||
}
|
||||
addConnToPool(pThrd->pool, pConn);
|
||||
transFreeMsg(pResp->pCont);
|
||||
cliSchedMsgToNextNode(pMsg, pThrd);
|
||||
return -1;
|
||||
|
|
|
@ -1029,8 +1029,9 @@ void transUnrefSrvHandle(void* handle) {
|
|||
}
|
||||
|
||||
void transReleaseSrvHandle(void* handle) {
|
||||
SExHandle* exh = handle;
|
||||
int64_t refId = exh->refId;
|
||||
SRpcHandleInfo* info = handle;
|
||||
SExHandle* exh = info->handle;
|
||||
int64_t refId = info->refId;
|
||||
|
||||
ASYNC_CHECK_HANDLE(exh, refId);
|
||||
|
||||
|
|
|
@ -175,7 +175,7 @@ static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)
|
|||
rpcMsg.code = 0;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
|
||||
rpcReleaseHandle(pMsg->info.handle, TAOS_CONN_SERVER);
|
||||
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER);
|
||||
}
|
||||
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
{
|
||||
|
|
|
@ -0,0 +1,442 @@
|
|||
import datetime
|
||||
import re
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Any, Tuple
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.constant import *
|
||||
|
||||
PRIMARY_COL = "ts"
|
||||
|
||||
INT_COL = "c_int"
|
||||
BINT_COL = "c_bint"
|
||||
SINT_COL = "c_sint"
|
||||
TINT_COL = "c_tint"
|
||||
FLOAT_COL = "c_float"
|
||||
DOUBLE_COL = "c_double"
|
||||
BOOL_COL = "c_bool"
|
||||
TINT_UN_COL = "c_utint"
|
||||
SINT_UN_COL = "c_usint"
|
||||
BINT_UN_COL = "c_ubint"
|
||||
INT_UN_COL = "c_uint"
|
||||
BINARY_COL = "c_binary"
|
||||
NCHAR_COL = "c_nchar"
|
||||
TS_COL = "c_ts"
|
||||
|
||||
NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
|
||||
CHAR_COL = [BINARY_COL, NCHAR_COL, ]
|
||||
BOOLEAN_COL = [BOOL_COL, ]
|
||||
TS_TYPE_COL = [TS_COL, ]
|
||||
|
||||
INT_TAG = "t_int"
|
||||
|
||||
ALL_COL = [PRIMARY_COL, INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BINARY_COL, NCHAR_COL, BOOL_COL, TS_COL]
|
||||
TAG_COL = [INT_TAG]
|
||||
|
||||
# insert data args:
|
||||
TIME_STEP = 10000
|
||||
NOW = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
|
||||
|
||||
# init db/table
|
||||
DBNAME = "db"
|
||||
STBNAME = "stb1"
|
||||
CTBNAME = "ct1"
|
||||
NTBNAME = "nt1"
|
||||
|
||||
|
||||
@dataclass
|
||||
class DataSet:
|
||||
ts_data : List[int] = field(default_factory=list)
|
||||
int_data : List[int] = field(default_factory=list)
|
||||
bint_data : List[int] = field(default_factory=list)
|
||||
sint_data : List[int] = field(default_factory=list)
|
||||
tint_data : List[int] = field(default_factory=list)
|
||||
int_un_data : List[int] = field(default_factory=list)
|
||||
bint_un_data: List[int] = field(default_factory=list)
|
||||
sint_un_data: List[int] = field(default_factory=list)
|
||||
tint_un_data: List[int] = field(default_factory=list)
|
||||
float_data : List[float] = field(default_factory=list)
|
||||
double_data : List[float] = field(default_factory=list)
|
||||
bool_data : List[int] = field(default_factory=list)
|
||||
binary_data : List[str] = field(default_factory=list)
|
||||
nchar_data : List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BSMAschema:
|
||||
creation : str = "CREATE"
|
||||
tb_type : str = "stable"
|
||||
tbname : str = STBNAME
|
||||
cols : Tuple[str] = None
|
||||
tags : Tuple[str] = None
|
||||
sma_flag : str = "SMA"
|
||||
sma_cols : Tuple[str] = None
|
||||
create_tabel_sql : str = None
|
||||
other : Any = None
|
||||
|
||||
drop : str = "DROP"
|
||||
drop_flag : str = "INDEX"
|
||||
querySmaOptimize : int = 1
|
||||
show : str = "SHOW"
|
||||
show_msg : str = "INDEXES"
|
||||
show_oper : str = "FROM"
|
||||
dbname : str = None
|
||||
rollup_db : bool = False
|
||||
|
||||
def __post_init__(self):
|
||||
if isinstance(self.other, dict):
|
||||
for k,v in self.other.items():
|
||||
|
||||
if k.lower() == "tbname" and isinstance(v, str) and not self.tbname:
|
||||
self.tbname = v
|
||||
del self.other[k]
|
||||
|
||||
if k.lower() == "cols" and (isinstance(v, tuple) or isinstance(v, list)) and not self.cols:
|
||||
self.cols = v
|
||||
del self.other[k]
|
||||
|
||||
if k.lower() == "tags" and (isinstance(v, tuple) or isinstance(v, list)) and not self.tags:
|
||||
self.tags = v
|
||||
del self.other[k]
|
||||
|
||||
if k.lower() == "sma_flag" and isinstance(v, str) and not self.sma_flag:
|
||||
self.sma_flag = v
|
||||
del self.other[k]
|
||||
|
||||
if k.lower() == "sma_cols" and (isinstance(v, tuple) or isinstance(v, list)) and not self.sma_cols:
|
||||
self.sma_cols = v
|
||||
del self.other[k]
|
||||
|
||||
if k.lower() == "create_tabel_sql" and isinstance(v, str) and not self.create_tabel_sql:
|
||||
self.create_tabel_sql = v
|
||||
del self.other[k]
|
||||
|
||||
# bSma show and drop operator is not completed
|
||||
if k.lower() == "drop_flag" and isinstance(v, str) and not self.drop_flag:
|
||||
self.drop_flag = v
|
||||
del self.other[k]
|
||||
|
||||
if k.lower() == "show_msg" and isinstance(v, str) and not self.show_msg:
|
||||
self.show_msg = v
|
||||
del self.other[k]
|
||||
|
||||
if k.lower() == "dbname" and isinstance(v, str) and not self.dbname:
|
||||
self.dbname = v
|
||||
del self.other[k]
|
||||
|
||||
if k.lower() == "show_oper" and isinstance(v, str) and not self.show_oper:
|
||||
self.show_oper = v
|
||||
del self.other[k]
|
||||
|
||||
if k.lower() == "rollup_db" and isinstance(v, bool) and not self.rollup_db:
|
||||
self.rollup_db = v
|
||||
del self.other[k]
|
||||
|
||||
|
||||
|
||||
# from ...pytest.util.sql import *
|
||||
# from ...pytest.util.constant import *
|
||||
|
||||
class TDTestCase:
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
self.precision = "ms"
|
||||
self.sma_count = 0
|
||||
self.sma_created_index = []
|
||||
|
||||
def __create_sma_index(self, sma:BSMAschema):
|
||||
if sma.create_tabel_sql:
|
||||
sql = sma.create_tabel_sql
|
||||
else:
|
||||
sql = f"{sma.creation} {sma.tb_type} {sma.tbname} ({', '.join(sma.cols)}) "
|
||||
|
||||
if sma.tb_type == "stable" or (sma.tb_type=="table" and sma.tags):
|
||||
sql = f"{sma.creation} {sma.tb_type} {sma.tbname} ({', '.join(sma.cols)}) tags ({', '.join(sma.tags)}) "
|
||||
|
||||
|
||||
if sma.sma_flag:
|
||||
sql += sma.sma_flag
|
||||
if sma.sma_cols:
|
||||
sql += f"({', '.join(sma.sma_cols)})"
|
||||
|
||||
if isinstance(sma.other, dict):
|
||||
for k,v in sma.other.items():
|
||||
if isinstance(v,tuple) or isinstance(v, list):
|
||||
sql += f" {k} ({' '.join(v)})"
|
||||
else:
|
||||
sql += f" {k} {v}"
|
||||
if isinstance(sma.other, tuple) or isinstance(sma.other, list):
|
||||
sql += " ".join(sma.other)
|
||||
if isinstance(sma.other, int) or isinstance(sma.other, float) or isinstance(sma.other, str):
|
||||
sql += f" {sma.other}"
|
||||
|
||||
return sql
|
||||
|
||||
def __get_bsma_table_col_tag_str(self, sql:str):
|
||||
p = re.compile(r"[(](.*)[)]", re.S)
|
||||
|
||||
if "tags" in (col_str := sql):
|
||||
col_str = re.findall(p, sql.split("tags")[0])[0].split(",")
|
||||
if (tag_str := re.findall(p, sql.split("tags")[1])[0].split(",") ):
|
||||
col_str.extend(tag_str)
|
||||
|
||||
return col_str
|
||||
|
||||
def __get_bsma_col_tag_names(self, col_tags:list):
|
||||
return [ col_tag.strip().split(" ")[0] for col_tag in col_tags ]
|
||||
|
||||
@property
|
||||
def __get_db_tbname(self):
|
||||
tb_list = []
|
||||
tdSql.query("show tables")
|
||||
for row in tdSql.queryResult:
|
||||
tb_list.append(row[0])
|
||||
tdSql.query("show tables")
|
||||
for row in tdSql.queryResult:
|
||||
tb_list.append(row[0])
|
||||
|
||||
return tb_list
|
||||
|
||||
def __bsma_create_check(self, sma:BSMAschema):
|
||||
if not sma.creation:
|
||||
return False
|
||||
if not sma.create_tabel_sql and (not sma.tbname or not sma.tb_type or not sma.cols):
|
||||
return False
|
||||
if not sma.create_tabel_sql and (sma.tb_type == "stable" and not sma.tags):
|
||||
return False
|
||||
if not sma.sma_flag or not isinstance(sma.sma_flag, str) or sma.sma_flag.upper() != "SMA":
|
||||
return False
|
||||
if sma.tbname in self.__get_db_tbname:
|
||||
return False
|
||||
|
||||
if sma.create_tabel_sql:
|
||||
col_tag_list = self.__get_bsma_col_tag_names(self.__get_bsma_table_col_tag_str(sma.create_tabel_sql))
|
||||
else:
|
||||
col_str = list(sma.cols)
|
||||
if sma.tags:
|
||||
col_str.extend(list(sma.tags))
|
||||
col_tag_list = self.__get_bsma_col_tag_names(col_str)
|
||||
if not sma.sma_cols:
|
||||
return False
|
||||
for col in sma.sma_cols:
|
||||
if col not in col_tag_list:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def bsma_create_check(self, sma:BSMAschema):
|
||||
if self.__bsma_create_check(sma):
|
||||
tdSql.query(self.__create_sma_index(sma))
|
||||
tdLog.info(f"current sql: {self.__create_sma_index(sma)}")
|
||||
|
||||
else:
|
||||
tdSql.error(self.__create_sma_index(sma))
|
||||
|
||||
|
||||
def __sma_drop_check(self, sma:BSMAschema):
|
||||
pass
|
||||
|
||||
def sma_drop_check(self, sma:BSMAschema):
|
||||
pass
|
||||
|
||||
def __show_sma_index(self, sma:BSMAschema):
|
||||
pass
|
||||
|
||||
def __sma_show_check(self, sma:BSMAschema):
|
||||
pass
|
||||
|
||||
def sma_show_check(self, sma:BSMAschema):
|
||||
pass
|
||||
|
||||
@property
|
||||
def __create_sma_sql(self):
|
||||
err_sqls = []
|
||||
cur_sqls = []
|
||||
# err_set
|
||||
### case 1: required fields check
|
||||
err_sqls.append( BSMAschema(creation="", tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_cols=(PRIMARY_COL, INT_COL ) ) )
|
||||
err_sqls.append( BSMAschema(tbname="", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_cols=(PRIMARY_COL, INT_COL ) ) )
|
||||
err_sqls.append( BSMAschema(tbname="stb2", cols=(), tags=(f"{INT_TAG} int",), sma_cols=(PRIMARY_COL, INT_COL ) ) )
|
||||
err_sqls.append( BSMAschema(tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(), sma_cols=(PRIMARY_COL, INT_COL ) ) )
|
||||
err_sqls.append( BSMAschema(tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_flag="", sma_cols=(PRIMARY_COL, INT_COL ) ) )
|
||||
err_sqls.append( BSMAschema(tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_cols=() ) )
|
||||
### case 2:
|
||||
err_sqls.append( BSMAschema(tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_cols=({BINT_COL}) ) )
|
||||
|
||||
# current_set
|
||||
cur_sqls.append( BSMAschema(tbname="stb2", cols=(f"{PRIMARY_COL} timestamp", f"{INT_COL} int"), tags=(f"{INT_TAG} int",), sma_cols=(PRIMARY_COL, INT_COL ) ) )
|
||||
|
||||
return err_sqls, cur_sqls
|
||||
|
||||
def test_create_sma(self):
|
||||
err_sqls , cur_sqls = self.__create_sma_sql
|
||||
for err_sql in err_sqls:
|
||||
self.bsma_create_check(err_sql)
|
||||
for cur_sql in cur_sqls:
|
||||
self.bsma_create_check(cur_sql)
|
||||
|
||||
@property
|
||||
def __drop_sma_sql(self):
|
||||
err_sqls = []
|
||||
cur_sqls = []
|
||||
# err_set
|
||||
## case 1: required fields check
|
||||
return err_sqls, cur_sqls
|
||||
|
||||
def test_drop_sma(self):
|
||||
err_sqls , cur_sqls = self.__drop_sma_sql
|
||||
for err_sql in err_sqls:
|
||||
self.sma_drop_check(err_sql)
|
||||
for cur_sql in cur_sqls:
|
||||
self.sma_drop_check(cur_sql)
|
||||
|
||||
def all_test(self):
|
||||
self.test_create_sma()
|
||||
|
||||
def __create_tb(self):
|
||||
tdLog.printNoPrefix("==========step: create table")
|
||||
create_stb_sql = f'''create table {STBNAME}(
|
||||
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
|
||||
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
|
||||
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
|
||||
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
|
||||
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
|
||||
) tags ({INT_TAG} int)
|
||||
'''
|
||||
create_ntb_sql = f'''create table {NTBNAME}(
|
||||
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
|
||||
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
|
||||
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
|
||||
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
|
||||
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
|
||||
)
|
||||
'''
|
||||
tdSql.execute(create_stb_sql)
|
||||
tdSql.execute(create_ntb_sql)
|
||||
|
||||
for i in range(4):
|
||||
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
|
||||
|
||||
def __data_set(self, rows):
|
||||
data_set = DataSet()
|
||||
|
||||
for i in range(rows):
|
||||
data_set.ts_data.append(NOW + 1 * (rows - i))
|
||||
data_set.int_data.append(rows - i)
|
||||
data_set.bint_data.append(11111 * (rows - i))
|
||||
data_set.sint_data.append(111 * (rows - i) % 32767)
|
||||
data_set.tint_data.append(11 * (rows - i) % 127)
|
||||
data_set.int_un_data.append(rows - i)
|
||||
data_set.bint_un_data.append(11111 * (rows - i))
|
||||
data_set.sint_un_data.append(111 * (rows - i) % 32767)
|
||||
data_set.tint_un_data.append(11 * (rows - i) % 127)
|
||||
data_set.float_data.append(1.11 * (rows - i))
|
||||
data_set.double_data.append(1100.0011 * (rows - i))
|
||||
data_set.bool_data.append((rows - i) % 2)
|
||||
data_set.binary_data.append(f'binary{(rows - i)}')
|
||||
data_set.nchar_data.append(f'nchar_测试_{(rows - i)}')
|
||||
|
||||
return data_set
|
||||
|
||||
def __insert_data(self):
|
||||
tdLog.printNoPrefix("==========step: start inser data into tables now.....")
|
||||
data = self.__data_set(rows=self.rows)
|
||||
|
||||
# now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
|
||||
null_data = '''null, null, null, null, null, null, null, null, null, null, null, null, null, null'''
|
||||
zero_data = "0, 0, 0, 0, 0, 0, 0, 'binary_0', 'nchar_0', 0, 0, 0, 0, 0"
|
||||
|
||||
for i in range(self.rows):
|
||||
row_data = f'''
|
||||
{data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
|
||||
{data.bool_data[i]}, '{data.binary_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.tint_un_data[i]},
|
||||
{data.sint_un_data[i]}, {data.int_un_data[i]}, {data.bint_un_data[i]}
|
||||
'''
|
||||
neg_row_data = f'''
|
||||
{-1 * data.int_data[i]}, {-1 * data.bint_data[i]}, {-1 * data.sint_data[i]}, {-1 * data.tint_data[i]}, {-1 * data.float_data[i]}, {-1 * data.double_data[i]},
|
||||
{data.bool_data[i]}, '{data.binary_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {1 * data.tint_un_data[i]},
|
||||
{1 * data.sint_un_data[i]}, {1 * data.int_un_data[i]}, {1 * data.bint_un_data[i]}
|
||||
'''
|
||||
|
||||
tdSql.execute(
|
||||
f"insert into ct1 values ( {NOW - i * TIME_STEP}, {row_data} )")
|
||||
tdSql.execute(
|
||||
f"insert into ct2 values ( {NOW - i * int(TIME_STEP * 0.6)}, {neg_row_data} )")
|
||||
tdSql.execute(
|
||||
f"insert into ct4 values ( {NOW - i * int(TIME_STEP * 0.8) }, {row_data} )")
|
||||
tdSql.execute(
|
||||
f"insert into {NTBNAME} values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )")
|
||||
|
||||
tdSql.execute(
|
||||
f"insert into ct2 values ( {NOW + int(TIME_STEP * 0.6)}, {null_data} )")
|
||||
tdSql.execute(
|
||||
f"insert into ct2 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.6)}, {null_data} )")
|
||||
tdSql.execute(
|
||||
f"insert into ct2 values ( {NOW - self.rows * int(TIME_STEP * 0.29) }, {null_data} )")
|
||||
|
||||
tdSql.execute(
|
||||
f"insert into ct4 values ( {NOW + int(TIME_STEP * 0.8)}, {null_data} )")
|
||||
tdSql.execute(
|
||||
f"insert into ct4 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.8)}, {null_data} )")
|
||||
tdSql.execute(
|
||||
f"insert into ct4 values ( {NOW - self.rows * int(TIME_STEP * 0.39)}, {null_data} )")
|
||||
|
||||
tdSql.execute(
|
||||
f"insert into {NTBNAME} values ( {NOW + int(TIME_STEP * 1.2)}, {null_data} )")
|
||||
tdSql.execute(
|
||||
f"insert into {NTBNAME} values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )")
|
||||
tdSql.execute(
|
||||
f"insert into {NTBNAME} values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )")
|
||||
|
||||
def run(self):
|
||||
self.rows = 10
|
||||
|
||||
tdLog.printNoPrefix("==========step0:all check")
|
||||
|
||||
tdLog.printNoPrefix("==========step1:create table in normal database")
|
||||
tdSql.prepare()
|
||||
self.__create_tb()
|
||||
self.__insert_data()
|
||||
self.all_test()
|
||||
|
||||
# drop databases, create same name db、stb and sma index
|
||||
tdSql.prepare()
|
||||
self.__create_tb()
|
||||
self.__insert_data()
|
||||
self.all_test()
|
||||
|
||||
tdLog.printNoPrefix("==========step2:create table in rollup database")
|
||||
tdSql.execute("create database db3 retentions 1s:4m,2s:8m,3s:12m")
|
||||
tdSql.execute("use db3")
|
||||
tdSql.query(f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL})")
|
||||
|
||||
tdSql.execute("drop database if exists db1 ")
|
||||
tdSql.execute("drop database if exists db2 ")
|
||||
|
||||
tdDnodes.stop(1)
|
||||
tdDnodes.start(1)
|
||||
|
||||
tdLog.printNoPrefix("==========step4:after wal, all check again ")
|
||||
tdSql.prepare()
|
||||
self.__create_tb()
|
||||
self.__insert_data()
|
||||
self.all_test()
|
||||
|
||||
# drop databases, create same name db、stb and sma index
|
||||
tdSql.prepare()
|
||||
self.__create_tb()
|
||||
self.__insert_data()
|
||||
self.all_test()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -1,6 +1,6 @@
|
|||
import datetime
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
|
@ -36,36 +36,20 @@ NOW = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
|
|||
|
||||
@dataclass
|
||||
class DataSet:
|
||||
ts_data : List[int] = None
|
||||
int_data : List[int] = None
|
||||
bint_data : List[int] = None
|
||||
sint_data : List[int] = None
|
||||
tint_data : List[int] = None
|
||||
int_un_data : List[int] = None
|
||||
bint_un_data : List[int] = None
|
||||
sint_un_data : List[int] = None
|
||||
tint_un_data : List[int] = None
|
||||
float_data : List[float] = None
|
||||
double_data : List[float] = None
|
||||
bool_data : List[int] = None
|
||||
binary_data : List[str] = None
|
||||
nchar_data : List[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
self.ts_data = []
|
||||
self.int_data = []
|
||||
self.bint_data = []
|
||||
self.sint_data = []
|
||||
self.tint_data = []
|
||||
self.int_un_data = []
|
||||
self.bint_un_data = []
|
||||
self.sint_un_data = []
|
||||
self.tint_un_data = []
|
||||
self.float_data = []
|
||||
self.double_data = []
|
||||
self.bool_data = []
|
||||
self.binary_data = []
|
||||
self.nchar_data = []
|
||||
ts_data : List[int] = field(default_factory=list)
|
||||
int_data : List[int] = field(default_factory=list)
|
||||
bint_data : List[int] = field(default_factory=list)
|
||||
sint_data : List[int] = field(default_factory=list)
|
||||
tint_data : List[int] = field(default_factory=list)
|
||||
int_un_data : List[int] = field(default_factory=list)
|
||||
bint_un_data: List[int] = field(default_factory=list)
|
||||
sint_un_data: List[int] = field(default_factory=list)
|
||||
tint_un_data: List[int] = field(default_factory=list)
|
||||
float_data : List[float] = field(default_factory=list)
|
||||
double_data : List[float] = field(default_factory=list)
|
||||
bool_data : List[int] = field(default_factory=list)
|
||||
binary_data : List[str] = field(default_factory=list)
|
||||
nchar_data : List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
|
@ -107,15 +91,15 @@ class TDTestCase:
|
|||
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(count) watermark 1min",
|
||||
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay -1s",
|
||||
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark -1m",
|
||||
# f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ",
|
||||
# f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) max_delay 1m ",
|
||||
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ",
|
||||
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) max_delay 1m ",
|
||||
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} binary(16)) tags (tag1 int) rollup(avg) watermark 1s",
|
||||
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) tags (tag1 int) rollup(avg) max_delay 1m",
|
||||
# f"create table ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) rollup(avg) watermark 1s max_delay 1s",
|
||||
# f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) tags (tag1 int) " ,
|
||||
# f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " ,
|
||||
# f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) " ,
|
||||
# f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " ,
|
||||
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) rollup(avg) max_delay 1m",
|
||||
# f"create table ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) rollup(avg) watermark 1s max_delay 1s",
|
||||
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) " ,
|
||||
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " ,
|
||||
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) " ,
|
||||
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " ,
|
||||
|
||||
# watermark, max_delay: [0, 900000], [ms, s, m, ?]
|
||||
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1u",
|
||||
|
@ -136,8 +120,9 @@ class TDTestCase:
|
|||
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 5s max_delay 1m",
|
||||
f"create stable stb3 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(max) watermark 5s max_delay 1m",
|
||||
f"create stable stb4 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(sum) watermark 5s max_delay 1m",
|
||||
# f"create stable stb5 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(last) watermark 5s max_delay 1m",
|
||||
# f"create stable stb6 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m",
|
||||
f"create stable stb5 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(last) watermark 5s max_delay 1m",
|
||||
f"create stable stb6 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m",
|
||||
f"create stable stb7 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL})",
|
||||
]
|
||||
|
||||
def test_create_stb(self):
|
||||
|
@ -150,7 +135,7 @@ class TDTestCase:
|
|||
# assert "rollup" in tdSql.description
|
||||
tdSql.checkRows(len(self.create_stable_sql_current))
|
||||
|
||||
# tdSql.execute("use db") # because db is a noraml database, not a rollup database, should not be able to create a rollup database
|
||||
tdSql.execute("use db") # because db is a noraml database, not a rollup database, should not be able to create a rollup stable
|
||||
# tdSql.error(f"create stable nor_db_rollup_stb ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 5s max_delay 1m")
|
||||
|
||||
|
||||
|
@ -210,20 +195,6 @@ class TDTestCase:
|
|||
data_set.binary_data.append(f'binary{(rows - i)}')
|
||||
data_set.nchar_data.append(f'nchar_测试_{(rows - i)}')
|
||||
|
||||
# neg_data_set.ts_data.append(-1 * i)
|
||||
# neg_data_set.int_data.append(-i)
|
||||
# neg_data_set.bint_data.append(-11111 * i)
|
||||
# neg_data_set.sint_data.append(-111 * i % 32767)
|
||||
# neg_data_set.tint_data.append(-11 * i % 127)
|
||||
# neg_data_set.int_un_data.append(-i)
|
||||
# neg_data_set.bint_un_data.append(-11111 * i)
|
||||
# neg_data_set.sint_un_data.append(-111 * i % 32767)
|
||||
# neg_data_set.tint_un_data.append(-11 * i % 127)
|
||||
# neg_data_set.float_data.append(-1.11 * i)
|
||||
# neg_data_set.double_data.append(-1100.0011 * i)
|
||||
# neg_data_set.binary_data.append(f'binary{i}')
|
||||
# neg_data_set.nchar_data.append(f'nchar_测试_{i}')
|
||||
|
||||
return data_set
|
||||
|
||||
def __insert_data(self):
|
||||
|
@ -279,9 +250,14 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix("==========step2:create table in rollup database")
|
||||
tdSql.execute("create database db3 retentions 1s:4m,2s:8m,3s:12m")
|
||||
|
||||
tdSql.execute("drop database if exists db1 ")
|
||||
tdSql.execute("drop database if exists db2 ")
|
||||
|
||||
tdSql.execute("use db3")
|
||||
self.__create_tb()
|
||||
self.__insert_data()
|
||||
# self.__create_tb()
|
||||
# self.__insert_data()
|
||||
self.all_test()
|
||||
|
||||
tdSql.execute("drop database if exists db1 ")
|
||||
tdSql.execute("drop database if exists db2 ")
|
||||
|
|
|
@ -325,7 +325,7 @@ class TDTestCase:
|
|||
def __sma_create_check(self, sma:SMAschema):
|
||||
if self.updatecfgDict["querySmaOptimize"] == 0:
|
||||
return False
|
||||
# # TODO: if database is a rollup-db, can not create sma index
|
||||
# TODO: if database is a rollup-db, can not create sma index
|
||||
# tdSql.query("select database()")
|
||||
# if sma.rollup_db :
|
||||
# return False
|
||||
|
@ -493,8 +493,8 @@ class TDTestCase:
|
|||
err_sqls , cur_sqls = self.__drop_sma_sql
|
||||
for err_sql in err_sqls:
|
||||
self.sma_drop_check(err_sql)
|
||||
# for cur_sql in cur_sqls:
|
||||
# self.sma_drop_check(cur_sql)
|
||||
for cur_sql in cur_sqls:
|
||||
self.sma_drop_check(cur_sql)
|
||||
|
||||
def all_test(self):
|
||||
self.test_create_sma()
|
||||
|
@ -605,24 +605,23 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix("==========step1:create table in normal database")
|
||||
tdSql.prepare()
|
||||
self.__create_tb()
|
||||
# self.__insert_data()
|
||||
self.__insert_data()
|
||||
self.all_test()
|
||||
|
||||
# drop databases, create same name db、stb and sma index
|
||||
# tdSql.prepare()
|
||||
# self.__create_tb()
|
||||
# self.__insert_data()
|
||||
# self.all_test()
|
||||
|
||||
|
||||
|
||||
return
|
||||
tdSql.prepare()
|
||||
self.__create_tb()
|
||||
self.__insert_data()
|
||||
self.all_test()
|
||||
|
||||
tdLog.printNoPrefix("==========step2:create table in rollup database")
|
||||
tdSql.execute("create database db3 retentions 1s:4m,2s:8m,3s:12m")
|
||||
tdSql.execute("use db3")
|
||||
self.__create_tb()
|
||||
self.__insert_data()
|
||||
# self.__create_tb()
|
||||
tdSql.execute(f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL}) ")
|
||||
self.all_test()
|
||||
|
||||
# self.__insert_data()
|
||||
|
||||
tdSql.execute("drop database if exists db1 ")
|
||||
tdSql.execute("drop database if exists db2 ")
|
||||
|
|
|
@ -0,0 +1,241 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
import math
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.vgroups = 4
|
||||
self.ctbNum = 10
|
||||
self.rowsPerTbl = 10000
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 10,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 3,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 1}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
tdLog.info("create ctb")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
tdDnodes.stop(1)
|
||||
tdDnodes.start(1)
|
||||
return
|
||||
|
||||
def tmqCase3(self):
|
||||
tdLog.printNoPrefix("======== test case 3: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 10,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 10,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 1}
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
totalRowsInserted = expectRowsList[0]
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 3
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
|
||||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
consumerId = 4
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor 0")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 2
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
actConsumeTotalRows = resultList[0] + resultList[1]
|
||||
|
||||
if not (totalRowsInserted == actConsumeTotalRows):
|
||||
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||
|
||||
def tmqCase4(self):
|
||||
tdLog.printNoPrefix("======== test case 4: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 10,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 10,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 1}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
totalRowsInserted = expectRowsList[0]
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 5
|
||||
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
|
||||
topicList = topicNameList[0]
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor 0")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("wait commit notify")
|
||||
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||
|
||||
tdLog.info("pkill consume processor")
|
||||
tdCom.killProcessor("tmq_sim")
|
||||
|
||||
# time.sleep(10)
|
||||
|
||||
# reinit consume info, and start tmq_sim, then check consume result
|
||||
tmqCom.initConsumerTable()
|
||||
consumerId = 6
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor 1")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
tdLog.info("wait the consume result")
|
||||
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
actConsumeTotalRows = resultList[0]
|
||||
|
||||
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
|
||||
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
|
||||
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
|
||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 4 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.prepareTestEnv()
|
||||
self.tmqCase3()
|
||||
self.tmqCase4()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -85,7 +85,7 @@ class TDTestCase:
|
|||
'rowsPerTbl': 10000,
|
||||
'batchNum': 10,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 10,
|
||||
'pollDelay': 15,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 1}
|
||||
|
|
|
@ -19,11 +19,15 @@ python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
|
|||
python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
|
||||
python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
|
||||
python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
|
||||
python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
|
||||
python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
|
||||
python3 ./test.py -f 1-insert/alter_stable.py
|
||||
python3 ./test.py -f 1-insert/alter_table.py
|
||||
python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
|
||||
python3 ./test.py -f 1-insert/table_comment.py
|
||||
python3 ./test.py -f 1-insert/time_range_wise.py
|
||||
python3 ./test.py -f 1-insert/block_wise.py
|
||||
python3 ./test.py -f 1-insert/create_retentions.py
|
||||
|
||||
#python3 ./test.py -f 1-insert/table_param_ttl.py
|
||||
python3 ./test.py -f 2-query/between.py
|
||||
python3 ./test.py -f 2-query/distinct.py
|
||||
|
@ -114,19 +118,19 @@ python3 ./test.py -f 2-query/twa.py
|
|||
python3 ./test.py -f 2-query/irate.py
|
||||
|
||||
python3 ./test.py -f 2-query/function_null.py
|
||||
python3 ./test.py -f 2-query/queryQnode.py
|
||||
python3 ./test.py -f 2-query/queryQnode.py
|
||||
|
||||
#python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||
#python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||
#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
|
||||
#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
|
||||
#python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
|
||||
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
|
||||
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
|
||||
# python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3
|
||||
|
||||
|
@ -158,3 +162,4 @@ python3 ./test.py -f 7-tmq/tmqAlterSchema.py
|
|||
python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
|
||||
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py
|
||||
python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg.py
|
||||
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg.py
|
||||
|
|
Loading…
Reference in New Issue