Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_snapshot
This commit is contained in:
commit
eaa0a4282c
|
@ -56,20 +56,22 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
|||
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
|
||||
}
|
||||
|
||||
if (pMgmt->transId == transId) {
|
||||
if (pMgmt->transId == transId && transId != 0) {
|
||||
if (pMgmt->errCode != 0) {
|
||||
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
|
||||
}
|
||||
pMgmt->transId = 0;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
} else {
|
||||
#if 1
|
||||
mError("trans:%d, invalid commit msg since trandId not match with %d", transId, pMgmt->transId);
|
||||
#else
|
||||
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
||||
if (pTrans != NULL) {
|
||||
mndTransExecute(pMnode, pTrans);
|
||||
mndReleaseTrans(pMnode, pTrans);
|
||||
}
|
||||
#if 0
|
||||
sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
|
||||
// sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1976,7 +1976,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "leastsquares",
|
||||
.type = FUNCTION_TYPE_LEASTSQUARES,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
||||
.translateFunc = translateLeastSQR,
|
||||
.getEnvFunc = getLeastSQRFuncEnv,
|
||||
.initFunc = leastSQRFunctionSetup,
|
||||
|
|
|
@ -218,7 +218,7 @@ static SNode* createConstantValue() {
|
|||
static int32_t calcConstProjections(SCalcConstContext* pCxt, SSelectStmt* pSelect, bool subquery) {
|
||||
SNode* pProj = NULL;
|
||||
WHERE_EACH(pProj, pSelect->pProjectionList) {
|
||||
if (subquery && isUselessCol((SExprNode*)pProj)) {
|
||||
if (subquery && !pSelect->isDistinct && isUselessCol((SExprNode*)pProj)) {
|
||||
ERASE_NODE(pSelect->pProjectionList);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -4757,8 +4757,13 @@ static int32_t extractQueryResultSchema(const SNodeList* pProjections, int32_t*
|
|||
int32_t index = 0;
|
||||
FOREACH(pNode, pProjections) {
|
||||
SExprNode* pExpr = (SExprNode*)pNode;
|
||||
(*pSchema)[index].type = pExpr->resType.type;
|
||||
(*pSchema)[index].bytes = pExpr->resType.bytes;
|
||||
if (TSDB_DATA_TYPE_NULL == pExpr->resType.type) {
|
||||
(*pSchema)[index].type = TSDB_DATA_TYPE_VARCHAR;
|
||||
(*pSchema)[index].bytes = 0;
|
||||
} else {
|
||||
(*pSchema)[index].type = pExpr->resType.type;
|
||||
(*pSchema)[index].bytes = pExpr->resType.bytes;
|
||||
}
|
||||
(*pSchema)[index].colId = index + 1;
|
||||
if ('\0' != pExpr->userAlias[0]) {
|
||||
strcpy((*pSchema)[index].name, pExpr->userAlias);
|
||||
|
|
|
@ -35,36 +35,51 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print =============== step3: create show table
|
||||
print =============== step4: create show table
|
||||
sql create table ct1 using stb tags(1000)
|
||||
sql create table ct2 using stb tags(2000)
|
||||
sql create table ct3 using stb tags(3000)
|
||||
sql show tables
|
||||
if $rows != 1 then
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step5: insert data
|
||||
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
|
||||
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
|
||||
sql insert into ct2 values(now+0s, 10, 2.0, 3.0)
|
||||
sql insert into ct2 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
|
||||
sql insert into ct3 values('2021-01-01 00:00:00.000', 10, 2.0, 3.0)
|
||||
|
||||
print =============== step6: select data
|
||||
print =============== step6: query data
|
||||
sql select * from ct1
|
||||
#sql select * from stb
|
||||
sql select * from stb
|
||||
sql select c1, c2, c3 from ct1
|
||||
sql select ts, c1, c2, c3 from stb
|
||||
|
||||
print =============== step7: count
|
||||
sql select count(*) from ct1;
|
||||
#sql select count(*) from stb;
|
||||
#sql select count(ts), count(c1), count(c2), count(c3) from ct1
|
||||
#sql select count(ts), count(c1), count(c2), count(c3) from stb
|
||||
|
||||
print =============== step8: func
|
||||
#sql select first(ts), first(c1), first(c2), first(c3) from ct1
|
||||
#sql select min(c1), min(c2), min(c3) from ct1
|
||||
#sql select max(c1), max(c2), max(c3) from ct1
|
||||
#sql select sum(c1), sum(c2), sum(c3) from ct1
|
||||
|
||||
_OVER:
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
||||
print =============== check
|
||||
print ----> start to check if there are ERRORS in vagrind log file for each dnode
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content <= 0 then
|
||||
return 0
|
||||
endi
|
||||
|
||||
$null=
|
||||
if $system_content == $null then
|
||||
return 0
|
||||
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content > 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
return -1
|
||||
if $system_content == $null then
|
||||
return -1
|
||||
endi
|
||||
|
|
|
@ -23,48 +23,74 @@ if $data(1)[4] != ready then
|
|||
endi
|
||||
|
||||
print =============== step2: create db
|
||||
sql create database d1 vgroups 1 buffer 3
|
||||
sql create database d1 vgroups 3 buffer 3
|
||||
sql show databases
|
||||
sql use d1
|
||||
sql show vgroups
|
||||
|
||||
print =============== step3: create show stable
|
||||
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
|
||||
print =============== step3: create show stable, include all type
|
||||
sql create table if not exists stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(16), c9 nchar(16), c10 timestamp, c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned) tags (t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 binary(16), t9 nchar(16), t10 timestamp, t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)
|
||||
sql create stable if not exists stb_1 (ts timestamp, c1 int) tags (j int)
|
||||
sql create table stb_2 (ts timestamp, c1 int) tags (t1 int)
|
||||
sql create stable stb_3 (ts timestamp, c1 int) tags (t1 int)
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step3: create show table
|
||||
sql create table ct1 using stb tags(1000)
|
||||
#sql show tables
|
||||
#if $rows != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
print =============== step4: ccreate child table
|
||||
sql create table c1 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql create table c2 using stb tags(false, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 2', 'child tbl 2', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step5: insert data
|
||||
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
|
||||
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
|
||||
sql insert into c1 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql insert into c2 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql insert into c2 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
|
||||
print =============== step6: select data
|
||||
sql select * from ct1
|
||||
print =============== step6: alter insert
|
||||
sql insert into c3 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql insert into c3 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
|
||||
goto _OVER
|
||||
print =============== stepa: query data
|
||||
sql select * from c1
|
||||
sql select * from stb
|
||||
sql select * from stb_1
|
||||
sql select ts, c1, c2, c3 from c1
|
||||
sql select ts, c1, c2, c3 from stb
|
||||
sql select ts, c1 from stb_2
|
||||
sql select ts, c1, t1 from c1
|
||||
sql select ts, c1, t1 from stb
|
||||
sql select ts, c1, t1 from stb_2
|
||||
|
||||
print =============== stepb: count
|
||||
#sql select count(*) from c1;
|
||||
#sql select count(*) from stb;
|
||||
#sql select count(ts), count(c1), count(c2), count(c3) from c1
|
||||
#sql select count(ts), count(c1), count(c2), count(c3) from stb
|
||||
|
||||
print =============== stepc: func
|
||||
#sql select first(ts), first(c1), first(c2), first(c3) from c1
|
||||
#sql select min(c1), min(c2), min(c3) from c1
|
||||
#sql select max(c1), max(c2), max(c3) from c1
|
||||
#sql select sum(c1), sum(c2), sum(c3) from c1
|
||||
|
||||
_OVER:
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
||||
print =============== check
|
||||
print ----> start to check if there are ERRORS in vagrind log file for each dnode
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content <= 0 then
|
||||
return 0
|
||||
endi
|
||||
|
||||
$null=
|
||||
if $system_content == $null then
|
||||
return 0
|
||||
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content > 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
return -1
|
||||
if $system_content == $null then
|
||||
return -1
|
||||
endi
|
||||
|
|
|
@ -36,7 +36,12 @@ sql create dnode $hostname port 7200
|
|||
sql drop dnode 2
|
||||
sql alter dnode 1 'debugflag 131'
|
||||
|
||||
print =============== step4:
|
||||
print =============== create database, stable, table
|
||||
sql create database db vgroups 3
|
||||
sql use db
|
||||
sql create table stb (ts timestamp, c int) tags (t int)
|
||||
sql create table t0 using stb tags (0)
|
||||
sql create table tba (ts timestamp, c1 binary(10), c2 nchar(10));
|
||||
|
||||
print =============== run show xxxx
|
||||
sql show dnodes
|
||||
|
@ -50,6 +55,16 @@ if $rows != 1 then
|
|||
endi
|
||||
|
||||
sql show databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -70,11 +85,31 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select * from information_schema.user_databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from information_schema.user_stables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from information_schema.user_tables
|
||||
if $rows != 30 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from information_schema.user_users
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from information_schema.`vgroups`
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show variables;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
|
@ -94,17 +129,14 @@ print =============== stop
|
|||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
||||
print =============== check
|
||||
print ----> start to check if there are ERRORS in vagrind log file for each dnode
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content <= 0 then
|
||||
return 0
|
||||
endi
|
||||
|
||||
$null=
|
||||
if $system_content == $null then
|
||||
return 0
|
||||
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content > 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
return -1
|
||||
if $system_content == $null then
|
||||
return -1
|
||||
endi
|
||||
|
|
|
@ -35,7 +35,7 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print =============== step3: create show table
|
||||
print =============== step4: create show table
|
||||
sql create table ct1 using stb tags(1000)
|
||||
sql show tables
|
||||
if $rows != 1 then
|
||||
|
@ -54,17 +54,14 @@ _OVER:
|
|||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
||||
print =============== check
|
||||
print ----> start to check if there are ERRORS in vagrind log file for each dnode
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content <= 0 then
|
||||
return 0
|
||||
endi
|
||||
|
||||
$null=
|
||||
if $system_content == $null then
|
||||
return 0
|
||||
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content > 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
return -1
|
||||
if $system_content == $null then
|
||||
return -1
|
||||
endi
|
||||
|
|
|
@ -4,7 +4,7 @@ system sh/cfg.sh -n dnode1 -c debugflag -v 131
|
|||
system sh/exec.sh -n dnode1 -s start -v
|
||||
sql connect
|
||||
|
||||
print =============== step1: show dnodes
|
||||
print =============== step1: create drop show dnodes
|
||||
$x = 0
|
||||
step1:
|
||||
$x = $x + 1
|
||||
|
@ -22,124 +22,74 @@ if $data(1)[4] != ready then
|
|||
goto step1
|
||||
endi
|
||||
|
||||
print =============== step2: create alter drop show user
|
||||
sql create user u1 pass 'taosdata'
|
||||
sql show users
|
||||
sql alter user u1 sysinfo 1
|
||||
sql alter user u1 enable 1
|
||||
sql alter user u1 pass 'taosdata'
|
||||
sql drop user u1
|
||||
sql_error alter user u2 sysinfo 0
|
||||
|
||||
print =============== step3: create drop dnode
|
||||
sql create dnode $hostname port 7200
|
||||
sql drop dnode 2
|
||||
sql alter dnode 1 'debugflag 131'
|
||||
|
||||
print =============== create database, stable, table
|
||||
sql create database db vgroups 3
|
||||
sql use db
|
||||
sql create table stb (ts timestamp, c int) tags (t int)
|
||||
sql create table t0 using stb tags (0)
|
||||
sql create table tba (ts timestamp, c1 binary(10), c2 nchar(10));
|
||||
|
||||
print =============== run show xxxx
|
||||
sql show dnodes
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show mnodes
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step2: create db
|
||||
sql create database d1 vgroups 3 buffer 3
|
||||
sql show databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
sql use d1
|
||||
sql show vgroups
|
||||
|
||||
print =============== step3: create show stable, include all type
|
||||
sql create table if not exists stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(16), c9 nchar(16), c10 timestamp, c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned) tags (t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 binary(16), t9 nchar(16), t10 timestamp, t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)
|
||||
sql create stable if not exists stb_1 (ts timestamp, c1 int) tags (j int)
|
||||
sql create table stb_2 (ts timestamp, c1 int) tags (t1 int)
|
||||
sql create stable stb_3 (ts timestamp, c1 int) tags (t1 int)
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step4: ccreate child table
|
||||
sql create table c1 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql create table c2 using stb tags(false, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 2', 'child tbl 2', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show users
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
print =============== step5: insert data
|
||||
sql insert into c1 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql insert into c2 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql insert into c2 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
|
||||
print =============== run select * from information_schema.xxxx
|
||||
sql select * from information_schema.`dnodes`
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
print =============== step6: alter insert
|
||||
sql insert into c3 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
sql insert into c3 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
|
||||
|
||||
sql select * from information_schema.`mnodes`
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
print =============== stepa: query data
|
||||
sql select * from c1
|
||||
sql select * from stb
|
||||
sql select * from stb_1
|
||||
sql select ts, c1, c2, c3 from c1
|
||||
sql select ts, c1, c2, c3 from stb
|
||||
sql select ts, c1 from stb_2
|
||||
sql select ts, c1, t1 from c1
|
||||
sql select ts, c1, t1 from stb
|
||||
sql select ts, c1, t1 from stb_2
|
||||
|
||||
sql select * from information_schema.user_databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
print =============== stepb: count
|
||||
#sql select count(*) from c1;
|
||||
#sql select count(*) from stb;
|
||||
#sql select count(ts), count(c1), count(c2), count(c3) from c1
|
||||
#sql select count(ts), count(c1), count(c2), count(c3) from stb
|
||||
|
||||
sql select * from information_schema.user_stables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
print =============== stepc: func
|
||||
#sql select first(ts), first(c1), first(c2), first(c3) from c1
|
||||
#sql select min(c1), min(c2), min(c3) from c1
|
||||
#sql select max(c1), max(c2), max(c3) from c1
|
||||
#sql select sum(c1), sum(c2), sum(c3) from c1
|
||||
|
||||
sql select * from information_schema.user_tables
|
||||
if $rows != 30 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from information_schema.user_users
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from information_schema.`vgroups`
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show variables;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show dnode 1 variables;
|
||||
if $rows <= 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show local variables;
|
||||
if $rows <= 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== stop
|
||||
_OVER:
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
||||
print =============== check
|
||||
print ----> start to check if there are ERRORS in vagrind log file for each dnode
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content <= 0 then
|
||||
return 0
|
||||
endi
|
||||
|
||||
$null=
|
||||
if $system_content == $null then
|
||||
return 0
|
||||
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
print cmd return result ----> [ $system_content ]
|
||||
if $system_content > 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
return -1
|
||||
if $system_content == $null then
|
||||
return -1
|
||||
endi
|
||||
|
|
|
@ -371,7 +371,7 @@ class TMQCom:
|
|||
elif (i % 3 == 0):
|
||||
tagBinaryValue = 'changsha'
|
||||
|
||||
sql += " %s.%s_%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
|
||||
sql += " %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
|
||||
for j in range(rowsPerTbl):
|
||||
sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched)
|
||||
rowsBatched += 1
|
||||
|
@ -379,7 +379,7 @@ class TMQCom:
|
|||
tsql.execute(sql)
|
||||
rowsBatched = 0
|
||||
if j < rowsPerTbl - 1:
|
||||
sql = "insert into %s.%s_%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
|
||||
sql = "insert into %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
|
||||
else:
|
||||
sql = "insert into "
|
||||
#end sql
|
||||
|
|
|
@ -0,0 +1,259 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
from enum import Enum
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.snapshot = 0
|
||||
self.vgroups = 4
|
||||
self.ctbNum = 1
|
||||
self.rowsPerTbl = 100000
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1,
|
||||
'rowsPerTbl': 100000,
|
||||
'batchNum': 1200,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 3,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
tdLog.info("create ctb")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
return
|
||||
|
||||
# 自动建表完成数据插入,启动消费
|
||||
def tmqCase1(self):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1,
|
||||
'rowsPerTbl': 100000,
|
||||
'batchNum': 3000,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 5,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
paraDict['snapshot'] = self.snapshot
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
# update to half tables
|
||||
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
|
||||
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
|
||||
# paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
consumerId = 0
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 3/2)
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:true,\
|
||||
auto.commit.interval.ms:1000,\
|
||||
auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
tdSql.query(queryString)
|
||||
totalRowsInserted = tdSql.getRows()
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted))
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
def tmqCase2(self):
|
||||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 5000,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 5,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['snapshot'] = self.snapshot
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
|
||||
# update to half tables
|
||||
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
|
||||
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
|
||||
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
|
||||
# paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
consumerId = 1
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:true,\
|
||||
auto.commit.interval.ms:1000,\
|
||||
auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
tdSql.query(queryString)
|
||||
totalRowsInserted = tdSql.getRows()
|
||||
|
||||
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
# self.prepareTestEnv()
|
||||
# tdLog.printNoPrefix("=============================================")
|
||||
# tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||
# self.tmqCase1()
|
||||
# self.tmqCase2()
|
||||
|
||||
self.prepareTestEnv()
|
||||
tdLog.printNoPrefix("====================================================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
|
||||
self.snapshot = 1
|
||||
self.tmqCase1()
|
||||
# self.tmqCase2()
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -16,9 +16,11 @@ from tmqCommon import *
|
|||
|
||||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.snapshot = 0
|
||||
self.vgroups = 4
|
||||
self.ctbNum = 500
|
||||
self.ctbNum = 100
|
||||
self.rowsPerTbl = 1000
|
||||
self.autoCtbPrefix = 'aCtb'
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
|
@ -39,7 +41,7 @@ class TDTestCase:
|
|||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1000,
|
||||
'rowsPerTbl': 1000,
|
||||
'batchNum': 400,
|
||||
'batchNum': 10000,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 3,
|
||||
'showMsg': 1,
|
||||
|
@ -61,7 +63,7 @@ class TDTestCase:
|
|||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
|
@ -85,20 +87,21 @@ class TDTestCase:
|
|||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1000,
|
||||
'rowsPerTbl': 1000,
|
||||
'batchNum': 400,
|
||||
'batchNum': 10000,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 5,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['snapshot'] = self.snapshot
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
# update to half tables
|
||||
paraDict['ctbNum'] = int(self.ctbNum / 2)
|
||||
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||
paraDict['ctbNum'] = int(self.ctbNum/2)
|
||||
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
|
||||
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
|
@ -113,11 +116,16 @@ class TDTestCase:
|
|||
tdSql.execute(sqlString)
|
||||
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
consumerId = 0
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 3)
|
||||
if self.snapshot == 0:
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2))
|
||||
elif self.snapshot == 1:
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2))
|
||||
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 0
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:true,\
|
||||
auto.commit.interval.ms:1000,\
|
||||
|
@ -127,7 +135,7 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
tdLog.info("start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
|
@ -141,8 +149,9 @@ class TDTestCase:
|
|||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
def tmqCase2(self):
|
||||
|
@ -160,13 +169,14 @@ class TDTestCase:
|
|||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1000,
|
||||
'rowsPerTbl': 1000,
|
||||
'batchNum': 1000,
|
||||
'batchNum': 10000,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 5,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 1}
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['snapshot'] = self.snapshot
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
@ -175,13 +185,20 @@ class TDTestCase:
|
|||
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
|
||||
# update to half tables
|
||||
paraDict['ctbNum'] = int(self.ctbNum / 2)
|
||||
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||
paraDict['ctbNum'] = int(self.ctbNum/2)
|
||||
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
|
||||
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
|
||||
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum'])
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
|
||||
|
||||
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby",
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
|
||||
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum'])
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdLog.info("create topics from stb1")
|
||||
|
@ -191,12 +208,18 @@ class TDTestCase:
|
|||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
|
||||
# paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 * 2
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
consumerId = 1
|
||||
if self.snapshot == 0:
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2))
|
||||
elif self.snapshot == 1:
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2))
|
||||
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 0
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:true,\
|
||||
auto.commit.interval.ms:1000,\
|
||||
|
@ -206,7 +229,7 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
tdLog.info("start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
|
@ -218,10 +241,10 @@ class TDTestCase:
|
|||
|
||||
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
|
||||
|
||||
if totalConsumeRows != totalRowsInserted:
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tmqCom.checkFileContent(consumerId, queryString)
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
|
@ -230,6 +253,15 @@ class TDTestCase:
|
|||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.prepareTestEnv()
|
||||
tdLog.printNoPrefix("=============================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||
self.tmqCase1()
|
||||
self.tmqCase2()
|
||||
|
||||
self.prepareTestEnv()
|
||||
tdLog.printNoPrefix("====================================================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
|
||||
self.snapshot = 1
|
||||
self.tmqCase1()
|
||||
self.tmqCase2()
|
||||
|
|
@ -178,6 +178,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
|
|||
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
|
||||
python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
|
||||
#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
|
||||
#python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py
|
||||
#python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb.py
|
||||
|
||||
#------------querPolicy 2-----------
|
||||
|
||||
|
|
Loading…
Reference in New Issue