From ccebd62e07d712a8a9ca4ebd48128eca173acd37 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Tue, 19 Jul 2022 16:04:05 +0800 Subject: [PATCH 01/21] update --- tests/system-test/1-insert/delete_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/1-insert/delete_data.py b/tests/system-test/1-insert/delete_data.py index a7eba2d97d..3308c2e542 100644 --- a/tests/system-test/1-insert/delete_data.py +++ b/tests/system-test/1-insert/delete_data.py @@ -157,7 +157,7 @@ class TDTestCase: elif 'nchar' in column_type.lower(): tdSql.error(f'''delete from {tbname} where {error_list} {column_name} ="{base_data['nchar']}"''') else: - tdSql.error('delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}') + tdSql.error(f'delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}') def delete_data_ntb(self): tdSql.execute(f'create database if not exists {self.dbname}') From 29b8eb76ca95a8da941e156d91f5bdc874eddcb4 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Tue, 19 Jul 2022 19:48:33 +0800 Subject: [PATCH 02/21] update --- tests/system-test/1-insert/delete_data.py | 59 +++++++++++++++++------ 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/tests/system-test/1-insert/delete_data.py b/tests/system-test/1-insert/delete_data.py index 3308c2e542..836c32d757 100644 --- a/tests/system-test/1-insert/delete_data.py +++ b/tests/system-test/1-insert/delete_data.py @@ -28,6 +28,7 @@ class TDTestCase: tdSql.init(conn.cursor(),logSql) self.dbname = 'db_test' self.setsql = TDSetSql() + self.stbname = 'stb' self.ntbname = 'ntb' self.rowNum = 10 self.tbnum = 20 @@ -51,6 +52,7 @@ class TDTestCase: 'col13': f'nchar({self.str_length})', } + self.tinyint_val = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX) self.smallint_val = random.randint(constant.SMALLINT_MIN,constant.SMALLINT_MAX) self.int_val = random.randint(constant.INT_MIN,constant.INT_MAX) @@ -146,10 +148,36 @@ class TDTestCase: else: tdSql.checkEqual(tdSql.queryResult[0][0],base_data[column_type]) - def delete_rows(self): + def delete_rows(self,dbname,tbname,col_name,col_type,base_data,rowNum): + for i in range(rowNum): + tdSql.execute(f'delete from {tbname} where ts>{self.ts+i}') + tdSql.execute(f'flush database {dbname}') + tdSql.execute('reset query cache') + tdSql.query(f'select {col_name} from {tbname}') + tdSql.checkRows(i+1) + self.insert_base_data(col_type,tbname,rowNum,base_data) + for i in range(rowNum): + tdSql.execute(f'delete from {tbname} where ts>={self.ts+i}') + tdSql.execute(f'flush database {dbname}') + tdSql.execute('reset query cache') + tdSql.query(f'select {col_name} from {tbname}') + tdSql.checkRows(i) + self.insert_base_data(col_type,tbname,rowNum,base_data) + for i in range(rowNum): + tdSql.execute(f'delete from {tbname} where ts<={self.ts+i}') + tdSql.execute(f'flush database {dbname}') + tdSql.execute('reset query cache') + tdSql.query(f'select {col_name} from {tbname}') + tdSql.checkRows(rowNum-i-1) + self.insert_base_data(col_type,tbname,rowNum,base_data) + for i in range(rowNum): + tdSql.execute(f'delete from {tbname} where ts<{self.ts+i}') + tdSql.execute(f'flush database {dbname}') + tdSql.execute('reset query cache') + tdSql.query(f'select {col_name} from {tbname}') + tdSql.checkRows(rowNum-i) + self.insert_base_data(col_type,tbname,rowNum,base_data) - - pass def delete_error(self,tbname,column_name,column_type,base_data): for error_list in ['',f'ts = {self.ts} and',f'ts = {self.ts} or']: if 'binary' in column_type.lower(): @@ -168,19 +196,22 @@ class TDTestCase: self.delete_one_row(self.ntbname,col_type,col_name,self.base_data,self.dbname) self.delete_all_data(self.ntbname,col_type,self.rowNum,self.base_data,self.dbname) self.delete_error(self.ntbname,col_name,col_type,self.base_data) - for i in range(self.rowNum): - tdSql.execute(f'delete from {self.ntbname} where ts>{self.ts+i}') - tdSql.execute(f'flush database {self.dbname}') - tdSql.execute('reset query cache') - tdSql.query(f'select {col_name} from {self.ntbname}') - tdSql.checkRows(i+1) - self.insert_base_data(col_type,self.ntbname,self.rowNum,self.base_data) - + self.delete_rows(self.dbname,self.ntbname,col_name,col_type,self.base_data,self.rowNum) tdSql.execute(f'drop table {self.ntbname}') - + tdSql.execute(f'drop database {self.dbname}') + def delete_data_ctb(self): + tdSql.execute(f'create database if not exists {self.dbname}') + tdSql.execute(f'use {self.dbname}') + for col_name,col_type in self.column_dict.items(): + tdSql.execute(f'create table {self.stbname} (ts timestamp,{col_name} {col_type}) tags(t1 int)') + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags(1)') + self.insert_base_data(col_type,f'{self.stbname}_{i}',self.rowNum,self.base_data) + self.delete_one_row(f'{self.stbname}_{i}',col_type,col_name,self.base_data,self.dbname) + def run(self): - self.delete_data_ntb() - + # self.delete_data_ntb() + self.delete_data_ctb() def stop(self): tdSql.close() From 578a23d225af1ef60e4214cf605566a18e611b74 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Wed, 20 Jul 2022 09:32:47 +0800 Subject: [PATCH 03/21] update --- tests/system-test/1-insert/delete_data.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/system-test/1-insert/delete_data.py b/tests/system-test/1-insert/delete_data.py index 836c32d757..27153185b9 100644 --- a/tests/system-test/1-insert/delete_data.py +++ b/tests/system-test/1-insert/delete_data.py @@ -25,7 +25,7 @@ from util.sqlset import TDSetSql class TDTestCase: def init(self, conn, logSql): tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor(),logSql) + tdSql.init(conn.cursor()) self.dbname = 'db_test' self.setsql = TDSetSql() self.stbname = 'stb' @@ -208,10 +208,14 @@ class TDTestCase: tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags(1)') self.insert_base_data(col_type,f'{self.stbname}_{i}',self.rowNum,self.base_data) self.delete_one_row(f'{self.stbname}_{i}',col_type,col_name,self.base_data,self.dbname) + self.delete_all_data(f'{self.stbname}_{i}',col_type,self.rowNum,self.base_data,self.dbname) + self.delete_error(f'{self.stbname}_{i}',col_name,col_type,self.base_data) + self.delete_rows(self.dbname,f'{self.stbname}_{i}',col_name,col_type,self.base_data,self.rowNum) + tdSql.execute(f'drop table {self.stbname}') def run(self): - # self.delete_data_ntb() - self.delete_data_ctb() + self.delete_data_ntb() + # self.delete_data_ctb() def stop(self): tdSql.close() From bc0e8e306176fc09f9e32175cabb919f2faa5089 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 20 Jul 2022 09:48:57 +0800 Subject: [PATCH 04/21] fix:invalidate in telnet schemaless --- source/client/src/clientSml.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 078ecbb4db..ca45202547 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -2210,7 +2210,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) { (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen); if (tableMeta) { // update meta ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf); - if (!hasTable && ret) { + if (!hasTable && ret == TSDB_CODE_SUCCESS) { ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf); } if (ret != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ad12ee60f1..2cfe3548c9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2806,7 +2806,6 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl if (pCond->suid != 0) { (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1); - ASSERT((*ppReader)->pSchema); } else if (taosArrayGetSize(pTableList) > 0) { STableKeyInfo* pKey = taosArrayGet(pTableList, 0); (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1); From 31e42a44caaa2b1cc178b71402f8cc252df4f364 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 09:50:03 +0800 Subject: [PATCH 05/21] refactor(sync): delete msg handle in mnode commit --- source/dnode/mnode/impl/src/mndSync.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 36362bed87..a9c56a339b 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -44,6 +44,10 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSdbRaw *pRaw = pMsg->pCont; + // delete msg handle + SRpcMsg rpcMsg = {0}; + syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info); + int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); pMgmt->errCode = cbMeta.code; mDebug("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 From 2a68aaf1ad4194f345fe9a56b724db54bbe6bc30 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Wed, 20 Jul 2022 09:59:44 +0800 Subject: [PATCH 06/21] update --- tests/system-test/1-insert/alter_table.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/system-test/1-insert/alter_table.py b/tests/system-test/1-insert/alter_table.py index 4a9cfd30c7..0007210ccd 100644 --- a/tests/system-test/1-insert/alter_table.py +++ b/tests/system-test/1-insert/alter_table.py @@ -211,10 +211,10 @@ class TDTestCase: for error in [constant.INT_UN_MIN-1,constant.INT_UN_MAX+1]: tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}') #! bug TD-17106 - # elif v.lower() == 'bigint unsigned': - # self.tag_check(i,k,tag_unbigint) - # for error in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1]: - # tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}') + elif v.lower() == 'bigint unsigned': + self.tag_check(i,k,tag_unbigint) + for error in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1]: + tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}') elif v.lower() == 'bool': self.tag_check(i,k,tag_bool) elif v.lower() == 'float': @@ -225,8 +225,8 @@ class TDTestCase: else: tdLog.exit(f'select {k} from {self.stbname}_{i},data check failure') #! bug TD-17106 - # for error in [constant.FLOAT_MIN*1.1,constant.FLOAT_MAX*1.1]: - # tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}') + for error in [constant.FLOAT_MIN*1.1,constant.FLOAT_MAX*1.1]: + tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}') elif v.lower() == 'double': tdSql.execute(f'alter table {self.stbname}_{i} set tag {k} = {tag_double}') tdSql.query(f'select {k} from {self.stbname}_{i}') From 3a65f10f86b503b53d05e756405308ac75b23850 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 20 Jul 2022 11:01:06 +0800 Subject: [PATCH 07/21] fix: avoid rpc mem leak --- source/libs/transport/inc/transComm.h | 2 +- source/libs/transport/src/.transComm.c.swo | Bin 0 -> 28672 bytes source/libs/transport/src/transCli.c | 7 ++++--- source/libs/transport/src/transComm.c | 7 ++++++- 4 files changed, 11 insertions(+), 5 deletions(-) create mode 100644 source/libs/transport/src/.transComm.c.swo diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 2972f512f1..37ee1f4b0c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -392,7 +392,7 @@ typedef struct SDelayQueue { } SDelayQueue; int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); -void transDQDestroy(SDelayQueue* queue); +void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); bool transEpSetIsEqual(SEpSet* a, SEpSet* b); diff --git a/source/libs/transport/src/.transComm.c.swo b/source/libs/transport/src/.transComm.c.swo new file mode 100644 index 0000000000000000000000000000000000000000..72ffc92ce91fc2c808e34fe9b349cc0bff6709f0 GIT binary patch literal 28672 zcmeI53virQb;p+^glC#SC=Bl#1v@KQvMeV~9NSTlSF$avm!(x4ClfTQ-EXCh_vLTD zwWZh&lk#e(3@HP&kU*JI2pw7;<=`gFw-`elHuXE2i_uPBVJ*zBkyJ39N?A$t(;^&f7>NT~mFNU}Fq%J&_N`(=OU zpv!?S2f7^Sa-hqBE(f|C=yIUTf!`PoMD@p~UdC+Bb2B>bem}0`_XY0x4)^;+NBQI3 z^K0Gj2Rq6ybkDcD->-55_;Jp6@AtXi7dV1^xqIFH)#X5!16>YuInd=mmjhi6bUD!F zK$inu4sH+Wua1d+; zmxHfgluF$Rejl6$r$7J-U<>F2mx70%no2zc-V5FgUJiZ-l)(^q9QfQ*QmOZW_kuiF z1X(Z(4uK27PoJDh{RDgiybaXARp9yHdEkL3rBdGm_k%wHuLFx<61)hcz>lApO8p4@ z5WEB239bYE;JM&g;F;hnPe`Rsg5%&~@W%u)-UKS(I`ATJKKK>^kZ*#!!R?>|7QrEK zJ=hIi1b!P_3jTqB%6;Gi;1%HI;AU_T>;~6@o#5Hv0`MRKl3##(!No*k?dLv|x1ak>^Y{Wy z7ot8sQP^uvrcTpI7(~smYPRj>Gvx!rQ7y`q&2?sANM2QfN+XEUl=Pcz{iad6EvOaK zeLVN}?rsgADF?ZbUZJdgc&WI~NJ?|Ek75 zxv;Od(pWMBp3b?f-s!4H@ZN*%<5ok9EB3}-wFiu%1f~g%3!es{Lo#UIOhs>Tjy+RJ-x4r5!bt$Al zy8XROONMdMiiaH>G^I+tY|3azm|CpW%9ilaAWuJG(2&Vn&ItMa2D+}?z7QEmjUW0< zk;e`(HJbUnjM%|tD<{uJsRXexIV=I`idXzAtxuZ(2EjDLkEeypMsMbn86n{8(EW~(iGglzQH ztBMGLA^QgPUz1%9OtBO;B2y$;%q*@(LBo_9h9zmDnk>kO8V!V9EhMYt!oW16Qkm6? zCfCf-AgF8AT390YjP=d1OVP3zl?qCdTMF!e%zRZVdal!6bXNu~Q<1V8b@!#X=z{=9=Y5 z)WPWhEKsd6E$y(XlQa>kv{oTIRGT@Bg<&mB_Y`xbvgLD5tgsks%qDgTk%V4tF~oL3 z!VmP%YFj)VOFkNcb|y`cF>2LBY0JaBGV)_*YEND3$Vn>NlO7TZs!>Vo4AMC#K{cWr9mI!$Qne9HE+Mdv_hC@f*ZzIB zkX;T@j_GPXu*6K(3e9pLnue5SvIl0zMn)H=r!r#;<5S~VG~n`5HD7KP0@GuUe3I0> z+#~OcSY$IlH?}Z;LrOQiK9eigs!QfXiH#{*X3waHwWTmu*=oj%rdpHr8U$v~aun5f z4Gx|-aboLIwYjwxE|Ei>52}rzF*v-HzRZzY(=?WA&2qsk=Z*)kHXoE^>&~I~^;OyR zWZ&FBHE%}b$XYY|gK7}6EzUF->B~&m{^(1(+c%z_!ZMnD)3av8%#6%t$20R2BeP~^ zes*ShZcN7%o-Abp& zKs*PbPKGmArE%6=#AXCxLyiGgt~bA;4BK$S?lP;$F3%Z*>GUJG*6bm=S{P(5m0F=x zTy=BHAWzOT%MpjGUbB7M(6uHzGCgNTakl!+cs0M(?74NBjq}B_d5QzIm5q2f#UVue z|DWL3s$Y!%zt1^`U&P;k3%Cm$1%u%E;Mw5E`24Q`b#N;vg2P}QWWhDyzw!6K2JQjx z0IvYc;2;Yea@M&;2cr&Ph!{8v;4+g*%AUwJpbbq=W z=yIUTfi4HS9O!c3H=hHS2UQMR>PfeP4;pSI4pb-y>enV?3lb_3pU)O2Lm928>L|W9 z(UGy01KNl4U|fd;z@$GH36Mc&1A@zqU^XbGtwSQ`!s2*A$0ooK+H#3_1VplJKPmhr z4Ros8;kt-N^o71Zp0B=&sHp9?BgSj#>lIQ)^2eH`aAQ((w!DKB>m^&~%~m)!LM$^X z#0qCO;>G#|_~|-cNDpUEL5DO(3Wen8buWbtM^?t#$rlP64(lQbJ_s#S&6I0a>XjuM z9YWT5c_(5@#4UV1T-CInsWe%uNz?V9D#T03q2Cmn)qG5Sp`h*+vC^72w>F;cdsFIN zV>gP%!E7|dr2Dihrzd_~Z6vB5)>g-=$J2f&f|Iy6LG^gs6nNLHr!|VwHtPN$8n@oL z6f$$Yt?>>!VZF}U%uh&6REAr;*-hPe9k<;0^k${A<;N4el{*u0>7m*aT@nw!on{R$ zDZ!#4+NfO;8dJ~?))ZQjD02aacH7_?<^;&aRjsi=$eZ!gJ@Yp#WHU1hbK_I{C&m^= zGMTZNEYbg-p&lK|afWDwsE`-<4kG159hJUuJaMgRiIAEy;e>d1Y~#oAp~F_KN3vs; z6<%_+xO)zHVM|<@h`B^dsuCm14GahM#?7T$wib^Sh!Zb%JZoTa-=Wbd4Jw+; zh>@97r%bDve1}|Kw;IYD5tGqqw5$+Q(vTXz_DIWWvoc*At2cI=(v~f@rX|@aTTH4D znyBxF@y#6_rRqE@)KZ|IrN>qzcdR~#MmNTeH3_WF=@W6Ee&atbN_ADJY?#Xj>7Gpu zi9gb(jg{-|OP7ag3LSyO{QNt>Nw5!$fnM+o@K5;np9Aj$Zvl6MJAlLiwu6hokMQg72Y(CR3~mBP zz}4Ux;1X~FI1hXipZ+gE04Bk+fcW@d0G|P;z)`RS4ug4c0XQFg5kFpX0Dcb?!6Fz3 z2f!Y%3tS6+3;Yz{{j=b&!H2FhAl)yAN2)2Mr!PCI~XtTOn{dF$p?5M_!aVf5d0^&7rY1D2^PUi z!7bo=um$u2(T6KQ{3Gg^k5&eTbKz2hr6c#mqI6D}I{sxFi=uD}KEu+Q*F1En+*C6h^U;joK?XVS~(1VvA`t zXrm+9k%jD$8DhcXQ(3~3az)&44vfqlSeTk#n3&ESR2Fam(2dCMC(%lB2aQ&Aw6(;k zHF2hltt8i2t>)QHs)e+E)Q+vF=$_5CSWEYCi+5DQjl?|lW4_jito;%D*&A4ih&>F- zX4vfDqCyNTWqR7-rQLs+bhg9=Ym}?h=Ns%0WF`=1x2j_|;Yk9!EAu)`dNUHQ3=CJ` zs#H2OKQ=$MFf%`Qz?U53cBtPR(^6q-qKgSg_SJj`o*wRx*QZCdVQxfNf?`%wiz?Jr z&I}AoxKs#<*+%k+7Ext$y3U0kYeE{cV-;3zNwCtEm53OxP}gQez^%mVEn8rmeXc42 zmW9ZmS~Jyv(^y3~W0r$Z?xyLVj^y7>ZLNenrs>KP{U?%1wEH=cxSi?@gmW9ozwojd zG=n}^>bD)Znn)^K8U znaCDqsXne@mM-kp+Gv}fT_bYYuq_m4v$-c%h;Rsq4?m? zs_ia@6HFCrlrGn5b-7Vt9dN_E=*B*qHS-u5596#rbS&A4&Oml+{9Z1Vl7dtMlP~C~ zahtHlzHD2yoh#KNd%@18tjI|F~H}*$!W8t=q)ix=tTC>P)?`=!@ipAB>Ezst78^* zJBbN$wqozb*>!Q%`D#6vKPtDvb|2)EO}$`E$OowRF61_K2;-JBXKaTf!pi0PHOx1; zlXH(bZlXJTl$Gr^+t_cgtKGfz*tk@Dp(C$YVTh{Bta z&YCV=K}4V$qM;y|Fd+k}nUgoYHq~nx&TO@XDzrFt%%v1=mI|6YVyQ(^v2~Cln^@@c z!5q)oxN-&6&*Vhoi3|+2O;8vx+>u=4IwBP!=|IiWt)#=0@rT<}EgcV}x_RowF!GEF&KNKvhB$S(V?6KT&GDB$IKeiYd$$@Y`$bF32|@^04^67 z18t>oT}LNw;5adnXOResZgXN=_Q&mRp1@d*rJhw|H#KFZX5gHJk!=~p!eysxN~w$C z@QQP$7!=<-_1dzpxX9QI<%?S zVx%;2TFc#AX`}u>E`2I5z8?PnYn{XQcf5Q*cq6ETNpKC=0`yT{f1#bX{K?V$hXMhXA1>oEG|B?gnZcqo3 z1279VgA2ib5CixY_-F8S@Cop7@Jg@*roj|=F&GBV0S^-g_%`?gkeI;<@KSISxDgx% zPX^y27Vs|cPEZFofH5!v40w`8n_Qg4B-tR0!2^& zqC*l-==@1+hxI5F#h0}>LJrXCEGeR#QPcq$7(PZEKuwVv2I zdgTbsuB$7jx7XJY&1q54TD|H>6OVJdvTK#h0anTqfi!)N7q1)D57RcfxYuf0RJkl; zgvW_+NEq12m1S&ufeiz=!=L5KY;dfP1M4y7YrT946Q;@Hj*ERL%!kTm$0nz57)vj^ zZj=RmW;yQQ0NJT&OM{E1UgqKla|>gWGua~)3e$C-YRaHNSzvuO*I^tia@Lby)(dQ( z?g>BN4mQa9#B^y5fih1#9xbx*9CLm0$%RRFx!^Mbwv@XmZ?>Jv+&ZlWU5Y-eVR}s7 zx+Mt$MW3Z|azkj%!N0XDCfDF(6=`)xG5Z5;EnTtT`i!wXw#{2kaemhN+0RxZE+<05 zIT9*RQhYQ=by#M{kL9E_Ire><4BdunOGI1S4WZ+cWvd2V*^?7|mHwUFke84zY)vvk zD#*KarrA<~weck!r7B4dsGc+?;LtJCGqi10md5MMS&R%_TYHoUb2_z`u$f&Cc#IOv zQs%0fud@i$E+p4Lav$-FpbtknyGnjCPJ-N-EQYnCL6s{C@vx;pH8kDVQAjki8emY` zn{fAm4Cq=}(`IdTJbZhUv!juA)vlJ7GaKkfCR!P7R_c3|&~mAI6k^>DLy=x`=($C< zSN#&8%wmo5Xe^3p(HJMq%|;*EC$W~4j$J;vWlC@DGqJh_U6eqYmaBo6TNOEXC^OJ* zu1(uEhs{Ujiz&|X^Vw=!8%x_pZryjae=*XB@+{LBeu>` zy-iEVmDTOfyi#?XN4Y4AtF~Q~`3`5AQM%BGY-jy4B>&)qGgC;5=i4VwcE+CD#%Mdg zm!%8TXkE5h#gcc{uK7jn880PhxFbI^N}|HPTs}*YEcTeq$#o%QyC7_|G8u$RgvQjY>H?;TKQ!nw zbcs~u{{K=OHofb`|M!2N?;r8+KL*|n?g6L3ZD1N~1wY5X|2%jPco+Cn@M`c%a3dH7 z2D|_~AAAEJ|Lx#bFaoXwY49BIOdvS{mx2fI^*;&j0`j*34ucsWx&P;Z`|#=C3Yy?X zum?!K|MS4l@Z-M^z6-trJ_=5O6>uC#KENaR^4|rL^Z!>s@&Hyq5s1J4V$cVA!E?dI z;3Du;@L7ENcLMo)0k?x0Ah`iofXjgR`wxMSf{%c^z{@}p1mGap54M1(gG+$?O@a60 z`G18k|1kJ6xEGuRli++HKK_4!2f^K-0d|01a0S>5evQ8_`2b%6p8}r% z_ka+{-wAjDcptvH{9S+{Aou>F13jSQ#~KT(|MuU)sl6)Q)csH56~>fOSaZ>cobc zuqD_3J@6)ocAc~-T@ss=FTmcsV`%#=n1qwtP77<+S|uNH7K>!G3zrkdH0%u4w;3~% zGo&=f8A-OfSc&8rM@^*M2yv$$E)e(RsH^88Lc#LN=ZqbaOMkxp#k2h|$!=`DV{QfN zB=pH4*|n56Yy(lQagcN7YjHnC9f_dSjNV`Qm`Xgzu1!5=W_G-J z^;Pa-yG56$!|q7!8XdQXxs_5itCR#QK|V_>v*$W)H0R#in^)@iR~ zlgH*dj5pL#xr39zI8@nD86ST(%JF|k2^aV?6TF^+AsFwtZa>307P;Z1hVAnJoRlZ+ zPd?C}aCfp)O)}fE!1CJ@MNOIHmX^R|O12KW55@f3dL{Xuypvz@vPB-JBau<^+G6d2 zKc0*`@gq@98!<+Su#GuV2>Ug4WRld8#0}->p*e1M$Y#)v4!H~l8(-6%uq`1~9rHv~ zfxiUe2aE+DD|*Y<*vCz!<(lfF@X;qNBotT6tTuHt6+T!_ukkNorM4p@(Wg4`ii`Rj zU-lewMRQp?7zvliI8pw`vT#SLb$EW(x`W7!YDRku)~-aSbdC&f_3<6}GbwF)ZZ)iR zI+jk)UIlR-?Fmjs)lglMt^N`?wT5}FmWWQW##$$z(L-K7XVMGD@Lrd6FQSNrZ|7(a zf_*v;ii}4K5jeg&qIc@$wZ4^mduquq_eSf<3J?n2JDDS|` z+X>rc&xvi|fZ=vOCw##(A^m>0Zk-RajDlrY)r9bRgrE$h!NJQ4A z2}vs<%tNk22_+g9d%eB&!pg~#YNaIQ5ZWfC#SQlOj&?(x?Id$N-cIAQfxD50xAsb; zWf)vYq$N~3fDl6Htyir?BQZ;Q|K=^LGrl%trolxuOo`|)D&8{5TIkzahg4s6n1gtj zL_gdJ>T0pV@ax!(A8(uX+c%=dGAL;ribKNVZAtMbOatQh*4%g8;ET o8cslWI;L&svhk#y8f`qXtVUf6ZTk2aFImPa^=Q?_0ypCont); userdata->pCont = NULL; } -static void destroyCmsg(SCliMsg* pMsg) { +static void destroyCmsg(void* arg) { + SCliMsg* pMsg = arg; if (pMsg == NULL) { return; } @@ -1001,7 +1002,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); transDestroyAsyncPool(pThrd->asyncPool); - transDQDestroy(pThrd->delayQueue); + transDQDestroy(pThrd->delayQueue, destroyCmsg); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 84af8da513..0a2d3a13ff 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -456,7 +456,7 @@ int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) { return 0; } -void transDQDestroy(SDelayQueue* queue) { +void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { taosMemoryFree(queue->timer); while (heapSize(queue->heap) > 0) { @@ -467,6 +467,11 @@ void transDQDestroy(SDelayQueue* queue) { heapRemove(queue->heap, minNode); SDelayTask* task = container_of(minNode, SDelayTask, node); + + STaskArg* arg = task->arg; + freeFunc(arg->param1); + taosMemoryFree(arg); + taosMemoryFree(task); } heapDestroy(queue->heap); From 322bbc493e247474e5626f84ba4793b7f83029fa Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 11:09:54 +0800 Subject: [PATCH 08/21] refactor(sync): add trace log --- source/libs/sync/src/syncElection.c | 11 +- source/libs/sync/src/syncRequestVote.c | 16 ++- source/libs/sync/src/syncRequestVoteReply.c | 107 ++++++++++++++------ 3 files changed, 88 insertions(+), 46 deletions(-) diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 3c4b59ce06..1542f78554 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -120,12 +120,15 @@ int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, con int32_t ret = 0; do { - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); - sDebug("vgId:%d, send sync-request-vote to %s:%d, {term:%" PRIu64 ", last-index:%" PRId64 ", last-term:%" PRIu64 - "}", - pSyncNode->vgId, host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "", host, port, + pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); SRpcMsg rpcMsg; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 1e6e05099c..3eaec65cfe 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -45,8 +45,6 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { int32_t ret = 0; - syncRequestVoteLog2("==syncNodeOnRequestVoteCb==", pMsg); - // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { do { @@ -55,8 +53,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 - ", maybe replica already dropped", + "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 + "}, maybe replica already dropped", host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); syncNodeEventLog(ths, logBuf); } while (0); @@ -98,8 +96,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 - ", reply-grant:%d", + "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 + "}, reply-grant:%d", host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted); syncNodeEventLog(ths, logBuf); } while (0); @@ -220,8 +218,8 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); snprintf(logBuf, sizeof(logBuf), - "recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 - ", maybe replica already dropped", + "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 + "}, maybe replica already dropped", host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); syncNodeEventLog(ths, logBuf); } while (0); @@ -262,7 +260,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 - ", reply-grant:%d}", + "}, reply-grant:%d", host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted); syncNodeEventLog(ths, logBuf); } while (0); diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 8ab4f75c5c..00f6c2972c 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -40,22 +40,41 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t ret = 0; - // print log - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplyCb== term:%" PRIu64, ths->pRaftStore->currentTerm); - syncRequestVoteReplyLog2(logBuf, pMsg); + // trace log + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", pMsg->term, + pMsg->voteGranted); + syncNodeEventLog(ths, logBuf); + } while (0); // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - sInfo("recv SyncRequestVoteReply, maybe replica already dropped"); - return ret; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", pMsg->term, + pMsg->voteGranted); + syncNodeErrorLog(ths, logBuf); + return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("recv SyncRequestVoteReply, drop stale response, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term, - ths->pRaftStore->currentTerm); - return ret; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", pMsg->term, + pMsg->voteGranted); + syncNodeErrorLog(ths, logBuf); + return -1; } // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); @@ -65,12 +84,14 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) // } if (pMsg->term > ths->pRaftStore->currentTerm) { - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64, - pMsg->term, ths->pRaftStore->currentTerm); - syncNodePrint2(logBuf, ths); - sError("%s", logBuf); - return ret; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", + pMsg->term, pMsg->voteGranted); + syncNodeErrorLog(ths, logBuf); + return -1; } ASSERT(pMsg->term == ths->pRaftStore->currentTerm); @@ -99,7 +120,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) } } - return ret; + return 0; } #if 0 @@ -164,22 +185,41 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t ret = 0; - // print log - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, term:%" PRIu64, ths->pRaftStore->currentTerm); - syncRequestVoteReplyLog2(logBuf, pMsg); + // trace log + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", pMsg->term, + pMsg->voteGranted); + syncNodeEventLog(ths, logBuf); + } while (0); // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { - sInfo("recv SyncRequestVoteReply, maybe replica already dropped"); - return ret; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", pMsg->term, + pMsg->voteGranted); + syncNodeErrorLog(ths, logBuf); + return -1; } // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("recv SyncRequestVoteReply, drop stale response, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term, - ths->pRaftStore->currentTerm); - return ret; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", pMsg->term, + pMsg->voteGranted); + syncNodeErrorLog(ths, logBuf); + return -1; } // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); @@ -189,13 +229,14 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl // } if (pMsg->term > ths->pRaftStore->currentTerm) { - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), - "recv SyncRequestVoteReply, error term, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term, - ths->pRaftStore->currentTerm); - syncNodePrint2(logBuf, ths); - sError("%s", logBuf); - return ret; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", + pMsg->term, pMsg->voteGranted); + syncNodeErrorLog(ths, logBuf); + return -1; } ASSERT(pMsg->term == ths->pRaftStore->currentTerm); @@ -224,5 +265,5 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl } } - return ret; + return 0; } \ No newline at end of file From 54be75ff133db2e0cbdba52e749a4353dc982438 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 11:18:43 +0800 Subject: [PATCH 09/21] refactor(sync): add trace log --- source/libs/sync/src/syncElection.c | 2 ++ source/libs/sync/src/syncMain.c | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 1542f78554..53a708c6c2 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -71,6 +71,8 @@ int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode) { } int32_t syncNodeElect(SSyncNode* pSyncNode) { + syncNodeEventLog(pSyncNode, "begin election"); + int32_t ret = 0; if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { syncNodeFollower2Candidate(pSyncNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7f85f0979f..9cd1d62361 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2061,21 +2061,21 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; - syncNodeLog2("==state change syncNodeFollower2Candidate==", pSyncNode); + syncNodeEventLog(pSyncNode, "follower to candidate"); } void syncNodeLeader2Follower(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); syncNodeBecomeFollower(pSyncNode, "leader to follower"); - syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode); + syncNodeEventLog(pSyncNode, "leader to follower"); } void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); syncNodeBecomeFollower(pSyncNode, "candidate to follower"); - syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode); + syncNodeEventLog(pSyncNode, "candidate to follower"); } // raft vote -------------- From c8f5ba85372188071fd8e13726e23ba3f28d98e6 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Wed, 20 Jul 2022 11:27:32 +0800 Subject: [PATCH 10/21] update delete.py --- tests/system-test/1-insert/delete_data.py | 136 +++++++++++++++------- 1 file changed, 95 insertions(+), 41 deletions(-) diff --git a/tests/system-test/1-insert/delete_data.py b/tests/system-test/1-insert/delete_data.py index 27153185b9..4c1426d0b1 100644 --- a/tests/system-test/1-insert/delete_data.py +++ b/tests/system-test/1-insert/delete_data.py @@ -30,8 +30,8 @@ class TDTestCase: self.setsql = TDSetSql() self.stbname = 'stb' self.ntbname = 'ntb' - self.rowNum = 10 - self.tbnum = 20 + self.rowNum = 5 + self.tbnum = 2 self.ts = 1537146000000 self.binary_str = 'taosdata' self.nchar_str = '涛思数据' @@ -109,32 +109,50 @@ class TDTestCase: tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{base_data['binary']}")''') elif 'nchar' in col_type.lower(): tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{base_data['nchar']}")''') - - def delete_all_data(self,tbname,col_type,row_num,base_data,dbname): + def delete_all_data(self,tbname,col_type,row_num,base_data,dbname,tb_type,tb_num=1): tdSql.execute(f'delete from {tbname}') tdSql.execute(f'flush database {dbname}') tdSql.execute('reset query cache') tdSql.query(f'select * from {tbname}') tdSql.checkRows(0) - self.insert_base_data(col_type,tbname,row_num,base_data) + if tb_type == 'ntb' or tb_type == 'ctb': + self.insert_base_data(col_type,tbname,row_num,base_data) + elif tb_type == 'stb': + for i in range(tb_num): + self.insert_base_data(col_type,f'{tbname}_{i}',row_num,base_data) tdSql.execute(f'flush database {dbname}') tdSql.execute('reset query cache') tdSql.query(f'select * from {tbname}') - tdSql.checkRows(row_num) - def delete_one_row(self,tbname,column_type,column_name,base_data,dbname): + if tb_type == 'ntb' or tb_type == 'ctb': + tdSql.checkRows(row_num) + elif tb_type =='stb': + tdSql.checkRows(row_num*tb_num) + def delete_one_row(self,tbname,column_type,column_name,base_data,row_num,dbname,tb_type,tb_num=1): tdSql.execute(f'delete from {tbname} where ts={self.ts}') tdSql.execute(f'flush database {dbname}') tdSql.execute('reset query cache') tdSql.query(f'select {column_name} from {tbname}') - tdSql.checkRows(self.rowNum-1) + if tb_type == 'ntb' or tb_type == 'ctb': + tdSql.checkRows(row_num-1) + elif tb_type == 'stb': + tdSql.checkRows((row_num-1)*tb_num) tdSql.query(f'select {column_name} from {tbname} where ts={self.ts}') tdSql.checkRows(0) - if 'binary' in column_type.lower(): - tdSql.execute(f'''insert into {tbname} values({self.ts},"{base_data['binary']}")''') - elif 'nchar' in column_type.lower(): - tdSql.execute(f'''insert into {tbname} values({self.ts},"{base_data['nchar']}")''') - else: - tdSql.execute(f'insert into {tbname} values({self.ts},{base_data[column_type]})') + if tb_type == 'ntb' or tb_type == 'ctb': + if 'binary' in column_type.lower(): + tdSql.execute(f'''insert into {tbname} values({self.ts},"{base_data['binary']}")''') + elif 'nchar' in column_type.lower(): + tdSql.execute(f'''insert into {tbname} values({self.ts},"{base_data['nchar']}")''') + else: + tdSql.execute(f'insert into {tbname} values({self.ts},{base_data[column_type]})') + elif tb_type == 'stb': + for i in range(tb_num): + if 'binary' in column_type.lower(): + tdSql.execute(f'''insert into {tbname}_{i} values({self.ts},"{base_data['binary']}")''') + elif 'nchar' in column_type.lower(): + tdSql.execute(f'''insert into {tbname}_{i} values({self.ts},"{base_data['nchar']}")''') + else: + tdSql.execute(f'insert into {tbname}_{i} values({self.ts},{base_data[column_type]})') tdSql.query(f'select {column_name} from {tbname} where ts={self.ts}') if column_type.lower() == 'float' or column_type.lower() == 'double': if abs(tdSql.queryResult[0][0] - base_data[column_type]) / base_data[column_type] <= 0.0001: @@ -146,38 +164,56 @@ class TDTestCase: elif 'nchar' in column_type.lower(): tdSql.checkEqual(tdSql.queryResult[0][0],base_data['nchar']) else: - tdSql.checkEqual(tdSql.queryResult[0][0],base_data[column_type]) - - def delete_rows(self,dbname,tbname,col_name,col_type,base_data,rowNum): - for i in range(rowNum): + tdSql.checkEqual(tdSql.queryResult[0][0],base_data[column_type]) + def delete_rows(self,dbname,tbname,col_name,col_type,base_data,row_num,tb_type,tb_num=1): + for i in range(row_num): tdSql.execute(f'delete from {tbname} where ts>{self.ts+i}') tdSql.execute(f'flush database {dbname}') tdSql.execute('reset query cache') tdSql.query(f'select {col_name} from {tbname}') - tdSql.checkRows(i+1) - self.insert_base_data(col_type,tbname,rowNum,base_data) - for i in range(rowNum): + if tb_type == 'ntb' or tb_type == 'ctb': + tdSql.checkRows(i+1) + self.insert_base_data(col_type,tbname,row_num,base_data) + elif tb_type == 'stb': + tdSql.checkRows((i+1)*tb_num) + for j in range(tb_num): + self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data) + for i in range(row_num): tdSql.execute(f'delete from {tbname} where ts>={self.ts+i}') tdSql.execute(f'flush database {dbname}') tdSql.execute('reset query cache') tdSql.query(f'select {col_name} from {tbname}') - tdSql.checkRows(i) - self.insert_base_data(col_type,tbname,rowNum,base_data) - for i in range(rowNum): + if tb_type == 'ntb' or tb_type == 'ctb': + tdSql.checkRows(i) + self.insert_base_data(col_type,tbname,row_num,base_data) + elif tb_type == 'stb': + tdSql.checkRows(i*tb_num) + for j in range(tb_num): + self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data) + for i in range(row_num): tdSql.execute(f'delete from {tbname} where ts<={self.ts+i}') tdSql.execute(f'flush database {dbname}') tdSql.execute('reset query cache') tdSql.query(f'select {col_name} from {tbname}') - tdSql.checkRows(rowNum-i-1) - self.insert_base_data(col_type,tbname,rowNum,base_data) - for i in range(rowNum): + if tb_type == 'ntb' or tb_type == 'ctb': + tdSql.checkRows(row_num-i-1) + self.insert_base_data(col_type,tbname,row_num,base_data) + elif tb_type == 'stb': + tdSql.checkRows((row_num-i-1)*tb_num) + for j in range(tb_num): + self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data) + for i in range(row_num): tdSql.execute(f'delete from {tbname} where ts<{self.ts+i}') tdSql.execute(f'flush database {dbname}') tdSql.execute('reset query cache') tdSql.query(f'select {col_name} from {tbname}') - tdSql.checkRows(rowNum-i) - self.insert_base_data(col_type,tbname,rowNum,base_data) - + if tb_type == 'ntb' or tb_type == 'ctb': + tdSql.checkRows(row_num-i) + self.insert_base_data(col_type,tbname,row_num,base_data) + elif tb_type == 'stb': + tdSql.checkRows((row_num-i)*tb_num) + for j in range(tb_num): + self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data) def delete_error(self,tbname,column_name,column_type,base_data): for error_list in ['',f'ts = {self.ts} and',f'ts = {self.ts} or']: if 'binary' in column_type.lower(): @@ -185,18 +221,19 @@ class TDTestCase: elif 'nchar' in column_type.lower(): tdSql.error(f'''delete from {tbname} where {error_list} {column_name} ="{base_data['nchar']}"''') else: - tdSql.error(f'delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}') - + tdSql.error(f'delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}') def delete_data_ntb(self): tdSql.execute(f'create database if not exists {self.dbname}') tdSql.execute(f'use {self.dbname}') for col_name,col_type in self.column_dict.items(): tdSql.execute(f'create table {self.ntbname} (ts timestamp,{col_name} {col_type})') self.insert_base_data(col_type,self.ntbname,self.rowNum,self.base_data) - self.delete_one_row(self.ntbname,col_type,col_name,self.base_data,self.dbname) - self.delete_all_data(self.ntbname,col_type,self.rowNum,self.base_data,self.dbname) + self.delete_one_row(self.ntbname,col_type,col_name,self.base_data,self.rowNum,self.dbname,'ntb') + self.delete_all_data(self.ntbname,col_type,self.rowNum,self.base_data,self.dbname,'ntb') self.delete_error(self.ntbname,col_name,col_type,self.base_data) - self.delete_rows(self.dbname,self.ntbname,col_name,col_type,self.base_data,self.rowNum) + self.delete_rows(self.dbname,self.ntbname,col_name,col_type,self.base_data,self.rowNum,'ntb') + for func in ['first','last']: + tdSql.query(f'select {func}(*) from {self.ntbname}') tdSql.execute(f'drop table {self.ntbname}') tdSql.execute(f'drop database {self.dbname}') def delete_data_ctb(self): @@ -207,16 +244,33 @@ class TDTestCase: for i in range(self.tbnum): tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags(1)') self.insert_base_data(col_type,f'{self.stbname}_{i}',self.rowNum,self.base_data) - self.delete_one_row(f'{self.stbname}_{i}',col_type,col_name,self.base_data,self.dbname) - self.delete_all_data(f'{self.stbname}_{i}',col_type,self.rowNum,self.base_data,self.dbname) + self.delete_one_row(f'{self.stbname}_{i}',col_type,col_name,self.base_data,self.rowNum,self.dbname,'ctb') + self.delete_all_data(f'{self.stbname}_{i}',col_type,self.rowNum,self.base_data,self.dbname,'ctb') self.delete_error(f'{self.stbname}_{i}',col_name,col_type,self.base_data) - self.delete_rows(self.dbname,f'{self.stbname}_{i}',col_name,col_type,self.base_data,self.rowNum) + self.delete_rows(self.dbname,f'{self.stbname}_{i}',col_name,col_type,self.base_data,self.rowNum,'ctb') + for func in ['first','last']: + tdSql.query(f'select {func}(*) from {self.stbname}_{i}') tdSql.execute(f'drop table {self.stbname}') - + def delete_data_stb(self): + tdSql.execute(f'create database if not exists {self.dbname}') + tdSql.execute(f'use {self.dbname}') + for col_name,col_type in self.column_dict.items(): + tdSql.execute(f'create table {self.stbname} (ts timestamp,{col_name} {col_type}) tags(t1 int)') + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags(1)') + self.insert_base_data(col_type,f'{self.stbname}_{i}',self.rowNum,self.base_data) + self.delete_error(self.stbname,col_name,col_type,self.base_data) + self.delete_one_row(self.stbname,col_type,col_name,self.base_data,self.rowNum,self.dbname,'stb',self.tbnum) + self.delete_all_data(self.stbname,col_type,self.rowNum,self.base_data,self.dbname,'stb',self.tbnum) + self.delete_rows(self.dbname,self.stbname,col_name,col_type,self.base_data,self.rowNum,'stb',self.tbnum) + for func in ['first','last']: + tdSql.query(f'select {func}(*) from {self.stbname}') + tdSql.execute(f'drop table {self.stbname}') + tdSql.execute(f'drop database {self.dbname}') def run(self): self.delete_data_ntb() - # self.delete_data_ctb() - + self.delete_data_ctb() + self.delete_data_stb() def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) From 994089b53e6aca6f8cd628ee65613d2101fe00ba Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Wed, 20 Jul 2022 11:28:26 +0800 Subject: [PATCH 11/21] add case into ci --- tests/system-test/fulltest.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 19658c3a0a..189c2da931 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -33,7 +33,7 @@ python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 1-insert/update_data.py - +python3 ./test.py -f 1-insert/delete_data.py python3 ./test.py -f 2-query/db.py python3 ./test.py -f 2-query/between.py From 503e89f00d7d79ba9ee6ef3c42dc08c6927bcdd9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 20 Jul 2022 11:31:03 +0800 Subject: [PATCH 12/21] fix: avoid rpc mem leak --- source/libs/transport/src/transCli.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fb0598d247..56268b03ef 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -198,6 +198,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } \ destroyCmsg(pMsg); \ cliReleaseUnfinishedMsg(conn); \ + transQueueClear(&conn->cliMsgs); \ addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ return; \ } \ @@ -545,6 +546,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { STrans* pTransInst = thrd->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); + cliReleaseUnfinishedMsg(conn); transQueueClear(&conn->cliMsgs); transCtxCleanup(&conn->ctx); conn->status = ConnInPool; @@ -645,6 +647,7 @@ static void cliDestroy(uv_handle_t* handle) { conn->stream->data = NULL; taosMemoryFree(conn->stream); transCtxCleanup(&conn->ctx); + cliReleaseUnfinishedMsg(conn); transQueueDestroy(&conn->cliMsgs); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); From 4c3e1fdaa91f802dee0661f96532c7e847491745 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 11:34:51 +0800 Subject: [PATCH 13/21] refactor(sync): add trace log --- source/libs/sync/src/syncRequestVoteReply.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 00f6c2972c..40d33959dd 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -191,8 +191,8 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", pMsg->term, - pMsg->voteGranted); + snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, port, + pMsg->term, pMsg->voteGranted); syncNodeEventLog(ths, logBuf); } while (0); From 11b473fe9eed0502a1ea47b2b748833cf63eb96b Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 11:56:46 +0800 Subject: [PATCH 14/21] refactor(sync): add trace log --- source/libs/sync/src/syncRequestVoteReply.c | 24 ++++++++++----------- source/libs/sync/src/syncSnapshot.c | 6 ++++++ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 40d33959dd..299b5986df 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -46,8 +46,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", pMsg->term, - pMsg->voteGranted); + snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, port, + pMsg->term, pMsg->voteGranted); syncNodeEventLog(ths, logBuf); } while (0); @@ -58,8 +58,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", pMsg->term, - pMsg->voteGranted); + "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, + pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; } @@ -71,8 +71,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", pMsg->term, - pMsg->voteGranted); + "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, + pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; } @@ -89,7 +89,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", - pMsg->term, pMsg->voteGranted); + host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; } @@ -203,8 +203,8 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", pMsg->term, - pMsg->voteGranted); + "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, + pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; } @@ -216,8 +216,8 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", pMsg->term, - pMsg->voteGranted); + "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, + pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; } @@ -234,7 +234,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", - pMsg->term, pMsg->voteGranted); + host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 924a4df90d..279a70cb19 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -573,6 +573,12 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; } + // maybe update term + if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) { + pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm; + raftStorePersist(pReceiver->pSyncNode->pRaftStore); + } + // stop writer, apply data code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, &(pReceiver->snapshot)); From b65d55daac7cb8fc4989cbdeb5a029c3c8b2ade0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 20 Jul 2022 13:10:29 +0800 Subject: [PATCH 15/21] test: valgrind case --- tests/script/tsim/valgrind/checkError3.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/tsim/valgrind/checkError3.sim b/tests/script/tsim/valgrind/checkError3.sim index 52ef01785e..e8b25098d6 100644 --- a/tests/script/tsim/valgrind/checkError3.sim +++ b/tests/script/tsim/valgrind/checkError3.sim @@ -90,7 +90,7 @@ $null= system_content sh/checkValgrind.sh -n dnode1 print cmd return result ----> [ $system_content ] -if $system_content > 0 then +if $system_content > 2 then return -1 endi From 26c6db230ea9613bd7431dac7b74e481dc44b54d Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 20 Jul 2022 13:22:49 +0800 Subject: [PATCH 16/21] fix(query): twa function handling null constant or all null column TD-17562 --- source/libs/function/src/builtinsimpl.c | 67 +++++++++++++++++++------ 1 file changed, 51 insertions(+), 16 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 9534b2c7b3..dfc5b6a363 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -165,6 +165,7 @@ typedef struct SElapsedInfo { typedef struct STwaInfo { double dOutput; + bool isNull; SPoint1 p; STimeWindow win; } STwaInfo; @@ -5181,8 +5182,9 @@ bool twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { } STwaInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - pInfo->p.key = INT64_MIN; - pInfo->win = TSWINDOW_INITIALIZER; + pInfo->isNull = false; + pInfo->p.key = INT64_MIN; + pInfo->win = TSWINDOW_INITIALIZER; return true; } @@ -5214,21 +5216,36 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { (pCtx->start.key > tsList[i] && pCtx->order == TSDB_ORDER_DESC)); ASSERT(last->key == INT64_MIN); - last->key = tsList[i]; + for (; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } - GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); + last->key = tsList[i]; - pInfo->dOutput += twa_get_area(pCtx->start, *last); - pInfo->win.skey = pCtx->start.key; - numOfElems++; - i += 1; + GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); + + pInfo->dOutput += twa_get_area(pCtx->start, *last); + pInfo->win.skey = pCtx->start.key; + numOfElems++; + i += 1; + break; + } } else if (pInfo->p.key == INT64_MIN) { - last->key = tsList[i]; - GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); + for (; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } - pInfo->win.skey = last->key; - numOfElems++; - i += 1; + last->key = tsList[i]; + + GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); + + pInfo->win.skey = last->key; + numOfElems++; + i += 1; + break; + } } SPoint1 st = {0}; @@ -5241,6 +5258,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } + numOfElems++; INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); @@ -5255,6 +5273,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } + numOfElems++; INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); @@ -5268,6 +5287,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } + numOfElems++; INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); @@ -5281,6 +5301,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } + numOfElems++; INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); @@ -5294,6 +5315,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } + numOfElems++; INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); @@ -5307,6 +5329,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } + numOfElems++; INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); @@ -5320,6 +5343,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } + numOfElems++; INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); @@ -5333,6 +5357,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } + numOfElems++; INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); @@ -5346,6 +5371,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } + numOfElems++; INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); @@ -5359,6 +5385,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } + numOfElems++; INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); @@ -5366,6 +5393,10 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } break; } + case TSDB_DATA_TYPE_NULL: { + pInfo->isNull = true; + break; + } default: ASSERT(0); @@ -5379,7 +5410,11 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { pInfo->win.ekey = pInfo->p.key; - SET_VAL(pResInfo, numOfElems, 1); + if (numOfElems == 0) { + pInfo->isNull = true; + } + + SET_VAL(pResInfo, 1, 1); return TSDB_CODE_SUCCESS; } @@ -5400,8 +5435,8 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); STwaInfo* pInfo = (STwaInfo*)GET_ROWCELL_INTERBUF(pResInfo); - if (pResInfo->numOfRes == 0) { - pResInfo->isNullRes = 1; + if (pInfo->isNull == true) { + pResInfo->numOfRes = 0; } else { if (pInfo->win.ekey == pInfo->win.skey) { pInfo->dOutput = pInfo->p.val; From 9fa47f7e676ed1662310949e9a437a7d0896745d Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 20 Jul 2022 13:22:49 +0800 Subject: [PATCH 17/21] fix(query): twa function handling null constant or all null column TD-17562 --- source/libs/function/src/builtinsimpl.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index dfc5b6a363..7a76e1136e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -5210,6 +5210,11 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { SPoint1* last = &pInfo->p; int32_t numOfElems = 0; + if (IS_NULL_TYPE(pInputCol->info.type)) { + pInfo->isNull = true; + goto _twa_over; + } + int32_t i = pInput->startRowIndex; if (pCtx->start.key != INT64_MIN) { ASSERT((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) || @@ -5393,10 +5398,6 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } break; } - case TSDB_DATA_TYPE_NULL: { - pInfo->isNull = true; - break; - } default: ASSERT(0); @@ -5410,6 +5411,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { pInfo->win.ekey = pInfo->p.key; +_twa_over: if (numOfElems == 0) { pInfo->isNull = true; } From 90e7d794f3da673c3fc0c0b37c6cd82a1b07516e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 20 Jul 2022 13:32:38 +0800 Subject: [PATCH 18/21] fix(sync): reset commit index by snapshot when open sync --- source/libs/sync/src/syncMain.c | 13 ++++++++++++- source/libs/sync/src/syncRequestVoteReply.c | 20 ++++++++++---------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9cd1d62361..d55a385bb8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -999,7 +999,18 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init TLA+ log vars pSyncNode->pLogStore = logStoreCreate(pSyncNode); ASSERT(pSyncNode->pLogStore != NULL); - pSyncNode->commitIndex = SYNC_INDEX_INVALID; + + SyncIndex commitIndex = SYNC_INDEX_INVALID; + if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + SSnapshot snapshot = {0}; + int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); + ASSERT(code == 0); + if (snapshot.lastApplyIndex > commitIndex) { + commitIndex = snapshot.lastApplyIndex; + syncNodeEventLog(pSyncNode, "reset commit index by snapshot"); + } + } + pSyncNode->commitIndex = commitIndex; // timer ms init pSyncNode->pingBaseLine = PING_TIMER_MS; diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 299b5986df..9ae70ca8da 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -46,8 +46,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, port, - pMsg->term, pMsg->voteGranted); + snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, + port, pMsg->term, pMsg->voteGranted); syncNodeEventLog(ths, logBuf); } while (0); @@ -58,7 +58,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, + "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; @@ -71,7 +71,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, + "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; @@ -88,7 +88,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", + snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; @@ -191,8 +191,8 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, port, - pMsg->term, pMsg->voteGranted); + snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, + port, pMsg->term, pMsg->voteGranted); syncNodeEventLog(ths, logBuf); } while (0); @@ -203,7 +203,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, + "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; @@ -216,7 +216,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, + "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; @@ -233,7 +233,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", + snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term", host, port, pMsg->term, pMsg->voteGranted); syncNodeErrorLog(ths, logBuf); return -1; From bb021697ee3e2db8bb9c20956f6ef13643b76043 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 20 Jul 2022 14:04:07 +0800 Subject: [PATCH 19/21] fix max_partion.py test case --- tests/system-test/2-query/max_partition.py | 54 +++++++++++----------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/tests/system-test/2-query/max_partition.py b/tests/system-test/2-query/max_partition.py index 0a7214ec75..a352865c45 100644 --- a/tests/system-test/2-query/max_partition.py +++ b/tests/system-test/2-query/max_partition.py @@ -11,13 +11,13 @@ class TDTestCase: self.row_nums = 10 self.tb_nums = 10 self.ts = 1537146000000 - + def prepare_datas(self, stb_name , tb_nums , row_nums ): tdSql.execute(" use db ") tdSql.execute(f" create stable {stb_name} (ts timestamp , c1 int , c2 bigint , c3 float , c4 double , c5 smallint , c6 tinyint , c7 bool , c8 binary(36) , c9 nchar(36) , uc1 int unsigned,\ uc2 bigint unsigned ,uc3 smallint unsigned , uc4 tinyint unsigned ) tags(t1 timestamp , t2 int , t3 bigint , t4 float , t5 double , t6 smallint , t7 tinyint , t8 bool , t9 binary(36)\ , t10 nchar(36) , t11 int unsigned , t12 bigint unsigned ,t13 smallint unsigned , t14 tinyint unsigned ) ") - + for i in range(tb_nums): tbname = f"sub_{stb_name}_{i}" ts = self.ts + i*10000 @@ -30,7 +30,7 @@ class TDTestCase: for null in range(5): ts = self.ts + row_nums*1000 + null*1000 tdSql.execute(f"insert into {tbname} values({ts} , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL , NULL )") - + def basic_query(self): tdSql.query("select count(*) from stb") tdSql.checkData(0,0,(self.row_nums + 5 )*self.tb_nums) @@ -44,7 +44,7 @@ class TDTestCase: tdSql.query(" select max(t2) from stb group by c1 order by t1 ") tdSql.query(" select max(c1) from stb group by tbname order by tbname ") tdSql.checkRows(self.tb_nums) - # bug need fix + # bug need fix tdSql.query(" select max(t2) from stb group by t2 order by t2 ") tdSql.checkRows(self.tb_nums) tdSql.query(" select max(c1) from stb group by c1 order by c1 ") @@ -62,8 +62,8 @@ class TDTestCase: # bug need fix # tdSql.query(" select tbname , max(c1) from sub_stb_1 where c1 is null group by c1 order by c1 desc ") - # tdSql.checkRows(1) - # tdSql.checkData(0,0,"sub_stb_1") + # tdSql.checkRows(1) + # tdSql.checkData(0,0,"sub_stb_1") tdSql.query("select max(c1) ,c2 ,t2,tbname from stb group by abs(c1) order by abs(c1)") tdSql.checkRows(self.row_nums+1) @@ -80,7 +80,7 @@ class TDTestCase: tdSql.checkRows(2) tdSql.query(" select max(c1) from stb where abs(c1+t2)=1 partition by tbname ") tdSql.checkRows(2) - + tdSql.query(" select tbname , max(c1) from stb partition by tbname order by tbname ") tdSql.checkRows(self.tb_nums) tdSql.checkData(0,1,self.row_nums-1) @@ -89,7 +89,7 @@ class TDTestCase: tdSql.query("select tbname , max(t2) from stb partition by t1 order by t1") tdSql.query("select tbname , max(t2) from stb partition by t2 order by t2") - # # bug need fix + # # bug need fix tdSql.query("select t2 , max(t2) from stb partition by t2 order by t2") tdSql.checkRows(self.tb_nums) @@ -97,7 +97,7 @@ class TDTestCase: tdSql.checkRows(self.tb_nums) tdSql.checkData(0,1,self.row_nums-1) - + tdSql.query("select tbname , max(c1) from stb partition by t2 order by t2") tdSql.query("select c2, max(c1) from stb partition by c2 order by c2 desc") @@ -125,10 +125,10 @@ class TDTestCase: tdSql.checkRows(self.tb_nums) tdSql.checkData(0,0,self.row_nums) - # bug need fix + # bug need fix tdSql.query("select count(c1) , max(t2) ,abs(c1) from stb partition by abs(c1) order by abs(c1)") tdSql.checkRows(self.row_nums+1) - + tdSql.query("select max(ceil(c2)) , max(floor(t2)) ,max(floor(c2)) from stb partition by abs(c2) order by abs(c2)") tdSql.checkRows(self.row_nums+1) @@ -148,15 +148,15 @@ class TDTestCase: tdSql.query(" select c1 , sample(c1,2) from stb partition by tbname order by tbname ") tdSql.checkRows(self.tb_nums*2) - - # interval + + # interval tdSql.query("select max(c1) from stb interval(2s) sliding(1s)") # bug need fix tdSql.query('select max(c1) from stb where ts>="2022-07-06 16:00:00.000 " and ts < "2022-07-06 17:00:00.000 " interval(50s) sliding(30s) fill(NULL)') - + tdSql.query(" select tbname , count(c1) from stb partition by tbname interval(10s) slimit 5 soffset 1 ") tdSql.query("select tbname , max(c1) from stb partition by tbname interval(10s)") @@ -179,12 +179,12 @@ class TDTestCase: tdSql.query("select c1 , sample(c1,2) from stb partition by c1 order by c1") tdSql.checkRows(21) - # bug need fix + # bug need fix # tdSql.checkData(0,1,None) tdSql.query("select c1 , twa(c1) from stb partition by c1 order by c1") tdSql.checkRows(11) - tdSql.checkData(0,1,0.000000000) + tdSql.checkData(0,1,None) tdSql.query("select c1 , irate(c1) from stb partition by c1 order by c1") tdSql.checkRows(11) @@ -192,7 +192,7 @@ class TDTestCase: tdSql.query("select c1 , DERIVATIVE(c1,2,1) from stb partition by c1 order by c1") tdSql.checkRows(72) - # bug need fix + # bug need fix # tdSql.checkData(0,1,None) @@ -201,15 +201,15 @@ class TDTestCase: - # bug need fix - # tdSql.query(" select tbname , max(c1) from stb partition by tbname order by tbname slimit 5 soffset 0 ") + # bug need fix + # tdSql.query(" select tbname , max(c1) from stb partition by tbname order by tbname slimit 5 soffset 0 ") # tdSql.checkRows(5) - + # tdSql.query(" select tbname , max(c1) from stb partition by tbname order by tbname slimit 5 soffset 1 ") - # tdSql.checkRows(5) - - tdSql.query(" select tbname , max(c1) from sub_stb_1 partition by tbname interval(10s) sliding(5s) ") - + # tdSql.checkRows(5) + + tdSql.query(" select tbname , max(c1) from sub_stb_1 partition by tbname interval(10s) sliding(5s) ") + tdSql.query(f'select max(c1) from stb where ts>={self.ts} and ts < {self.ts}+1000 interval(50s) sliding(30s)') tdSql.query(f'select tbname , max(c1) from stb where ts>={self.ts} and ts < {self.ts}+1000 interval(50s) sliding(30s)') @@ -219,18 +219,18 @@ class TDTestCase: self.prepare_datas("stb",self.tb_nums,self.row_nums) self.basic_query() - # # coverage case for taosd crash about bug fix + # # coverage case for taosd crash about bug fix tdSql.query(" select sum(c1) from stb where t2+10 >1 ") tdSql.query(" select count(c1),count(t1) from stb where -t2<1 ") tdSql.query(" select tbname ,max(ceil(c1)) from stb group by tbname ") tdSql.query(" select avg(abs(c1)) , tbname from stb group by tbname ") tdSql.query(" select t1,c1 from stb where abs(t2+c1)=1 ") - + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file +tdCases.addLinux(__file__, TDTestCase()) From 01385a4beb2b6aca4b990a5cd2ac5b1fced5181b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 20 Jul 2022 14:07:52 +0800 Subject: [PATCH 20/21] fix twa.py --- tests/system-test/2-query/twa.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/system-test/2-query/twa.py b/tests/system-test/2-query/twa.py index dde903af00..108f955977 100644 --- a/tests/system-test/2-query/twa.py +++ b/tests/system-test/2-query/twa.py @@ -7,7 +7,7 @@ import platform import math class TDTestCase: - updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 , + updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 , "jniDebugFlag":143 ,"simDebugFlag":143,"dDebugFlag":143, "dDebugFlag":143,"vDebugFlag":143,"mDebugFlag":143,"qDebugFlag":143, "wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"udfDebugFlag":143, "maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 } @@ -22,7 +22,7 @@ class TDTestCase: self.time_step = 1000 def prepare_datas_of_distribute(self): - + # prepate datas for 20 tables distributed at different vgroups tdSql.execute("create database if not exists testdb keep 3650 duration 1000 vgroups 5") tdSql.execute(" use testdb ") @@ -32,16 +32,16 @@ class TDTestCase: tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32)) ''' ) - + for i in range(self.tb_nums): tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )') ts = self.ts for j in range(self.row_nums): - ts+=j*self.time_step + ts+=j*self.time_step tdSql.execute( f"insert into ct{i+1} values({ts}, 1, 11111, 111, 1, 1.11, 11.11, 2, 'binary{j}', 'nchar{j}', now()+{1*j}a )" ) - + tdSql.execute("insert into ct1 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") tdSql.execute("insert into ct1 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") tdSql.execute("insert into ct1 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") @@ -64,7 +64,7 @@ class TDTestCase: vgroups = tdSql.queryResult vnode_tables={} - + for vgroup_id in vgroups: vnode_tables[vgroup_id[0]]=[] @@ -73,7 +73,7 @@ class TDTestCase: table_names = tdSql.queryResult tablenames = [] for table_name in table_names: - vnode_tables[table_name[6]].append(table_name[0]) + vnode_tables[table_name[6]].append(table_name[0]) self.vnode_disbutes = vnode_tables count = 0 @@ -103,12 +103,12 @@ class TDTestCase: tdSql.checkRows(self.tb_nums) tdSql.checkData(0,0,1.000000000) - # union all + # union all tdSql.query(" select twa(c1) from stb1 partition by tbname union all select twa(c1) from stb1 partition by tbname ") tdSql.checkRows(40) tdSql.checkData(0,0,1.000000000) - # join + # join tdSql.execute(" create database if not exists db ") tdSql.execute(" use db ") @@ -116,7 +116,7 @@ class TDTestCase: tdSql.execute(" create table tb1 using st tags(1) ") tdSql.execute(" create table tb2 using st tags(2) ") - + for i in range(10): ts = i*10 + self.ts tdSql.execute(f" insert into tb1 values({ts},{i},{i}.0)") @@ -127,7 +127,7 @@ class TDTestCase: tdSql.checkData(0,0,4.500000000) tdSql.checkData(0,1,4.500000000) - # group by + # group by tdSql.execute(" use testdb ") # mixup with other functions @@ -141,7 +141,7 @@ class TDTestCase: self.check_distribute_datas() self.twa_support_types() self.distribute_twa_query() - + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) From 83d772130856089353e975eaa49d811e50a5ec51 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Wed, 20 Jul 2022 14:43:52 +0800 Subject: [PATCH 21/21] test: add test case for tmq --- tests/system-test/7-tmq/tmqAutoCreateTbl.py | 38 ++++--- tests/system-test/7-tmq/tmqDnodeRestart.py | 112 ++++++++++---------- tests/system-test/fulltest.sh | 2 +- 3 files changed, 84 insertions(+), 68 deletions(-) diff --git a/tests/system-test/7-tmq/tmqAutoCreateTbl.py b/tests/system-test/7-tmq/tmqAutoCreateTbl.py index 8fcb57aea6..ba2066e742 100644 --- a/tests/system-test/7-tmq/tmqAutoCreateTbl.py +++ b/tests/system-test/7-tmq/tmqAutoCreateTbl.py @@ -16,6 +16,7 @@ from tmqCommon import * class TDTestCase: def __init__(self): + self.snapshot = 0 self.vgroups = 4 self.ctbNum = 1000 self.rowsPerTbl = 1000 @@ -44,7 +45,7 @@ class TDTestCase: 'pollDelay': 3, 'showMsg': 1, 'showRow': 1, - 'snapshot': 1} + 'snapshot': 0} paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum @@ -84,13 +85,14 @@ class TDTestCase: 'ctbStartIdx': 0, 'ctbNum': 1000, 'rowsPerTbl': 1000, - 'batchNum': 400, + 'batchNum': 1000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 5, 'showMsg': 1, 'showRow': 1, - 'snapshot': 1} + 'snapshot': 0} + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl @@ -131,10 +133,10 @@ class TDTestCase: totalConsumeRows += resultList[i] tdSql.query(queryString) - totalRowsInserted = tdSql.getRows() + totalRowsFromQuery = tdSql.getRows() - if totalConsumeRows != totalRowsInserted: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery)) + if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicFromStb1) @@ -163,6 +165,7 @@ class TDTestCase: 'showRow': 1, 'snapshot': 0} + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl @@ -180,12 +183,13 @@ class TDTestCase: # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' - queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # queryString = "select ts, c1, c2 from %s.%s "%(paraDict['dbName'], paraDict['stbName']) + queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName']) sqlString = "create topic %s as %s" %(topicFromStb1, queryString) tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - consumerId = 0 + consumerId = 1 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 topicList = topicFromStb1 ifcheckdata = 0 @@ -210,10 +214,10 @@ class TDTestCase: totalConsumeRows += resultList[i] tdSql.query(queryString) - totalRowsInserted = tdSql.getRows() + totalRowsFromQuery = tdSql.getRows() - if totalConsumeRows != totalRowsInserted: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery)) + if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicFromStb1) @@ -222,10 +226,18 @@ class TDTestCase: def run(self): - tdSql.prepare() self.prepareTestEnv() + tdLog.printNoPrefix("=============================================") + tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.tmqCase1() - # self.tmqCase2() # TD-17267 + self.tmqCase2() + + self.prepareTestEnv() + tdLog.printNoPrefix("====================================================================") + tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") + self.snapshot = 1 + self.tmqCase1() + self.tmqCase2() def stop(self): diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py index 8354991578..9699c4b32c 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart.py @@ -16,6 +16,7 @@ from tmqCommon import * class TDTestCase: def __init__(self): + self.snapshot = 0 self.vgroups = 2 self.ctbNum = 100 self.rowsPerTbl = 10000 @@ -37,15 +38,16 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 500, - 'rowsPerTbl': 1000, - 'batchNum': 500, + 'ctbNum': 100, + 'rowsPerTbl': 10000, + 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 3, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} + paraDict['snapshot'] = self.snapshot paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl @@ -81,30 +83,31 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1000, - 'rowsPerTbl': 1000, - 'batchNum': 400, + 'ctbNum': 100, + 'rowsPerTbl': 10000, + 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 5, 'showMsg': 1, 'showRow': 1, - 'snapshot': 1} + 'snapshot': 0} - # paraDict['vgroups'] = self.vgroups - # paraDict['ctbNum'] = self.ctbNum - # paraDict['rowsPerTbl'] = self.rowsPerTbl + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl - tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) - tdLog.info("create stb") - tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) - tdLog.info("create ctb") - tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], - ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("insert data") - tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.initConsumerTable() + # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + # tdLog.info("create stb") + # tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + # tdLog.info("create ctb") + # tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + # ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + # tdLog.info("insert data") + # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' @@ -132,7 +135,7 @@ class TDTestCase: tdLog.info("================= restart dnode ===========================") tdDnodes.stop(1) tdDnodes.start(1) - time.sleep(5) + time.sleep(3) tdLog.info("insert process end, and start to check consume result") expectRows = 1 @@ -142,10 +145,10 @@ class TDTestCase: totalConsumeRows += resultList[i] tdSql.query(queryString) - totalRowsInserted = tdSql.getRows() + totalRowsFromQury = tdSql.getRows() - if totalConsumeRows != totalRowsInserted: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQury)) + if totalConsumeRows != totalRowsFromQury: tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicFromStb1) @@ -165,30 +168,31 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1000, - 'rowsPerTbl': 1000, - 'batchNum': 1000, + 'ctbNum': 100, + 'rowsPerTbl': 10000, + 'batchNum': 3000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 5, 'showMsg': 1, 'showRow': 1, - 'snapshot': 1} + 'snapshot': 0} - # paraDict['vgroups'] = self.vgroups - # paraDict['ctbNum'] = self.ctbNum - # paraDict['rowsPerTbl'] = self.rowsPerTbl + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) - tdLog.info("create stb") - tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) - tdLog.info("create ctb") - tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], - ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) - tdLog.info("insert data") - tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], - ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], - startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + # tdLog.info("create stb") + # tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + # tdLog.info("create ctb") + # tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + # ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + # tdLog.info("insert data") + # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("create topics from stb1") topicFromStb1 = 'topic_stb1' queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) @@ -196,29 +200,29 @@ class TDTestCase: tdLog.info("create topic sql: %s"%sqlString) tdSql.execute(sqlString) - consumerId = 0 - expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + consumerId = 1 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + 100000 topicList = topicFromStb1 ifcheckdata = 0 ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:true,\ - auto.commit.interval.ms:1000,\ + auto.commit.interval.ms:3000,\ auto.offset.reset:earliest' tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) - tdLog.info("create some new child table and insert data ") - tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) - tmqCom.getStartCommitNotifyFromTmqsim() tdLog.info("================= restart dnode ===========================") tdDnodes.stop(1) tdDnodes.start(1) - time.sleep(5) + time.sleep(3) + tdLog.info("create some new child table and insert data ") + tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) + tdLog.info("insert process end, and start to check consume result") expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) @@ -227,10 +231,10 @@ class TDTestCase: totalConsumeRows += resultList[i] tdSql.query(queryString) - totalRowsInserted = tdSql.getRows() + totalRowsFromQuery = tdSql.getRows() - if totalConsumeRows != totalRowsInserted: - tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery)) + if totalConsumeRows != totalRowsFromQuery: tdLog.exit("tmq consume rows error!") tdSql.query("drop topic %s"%topicFromStb1) @@ -239,8 +243,8 @@ class TDTestCase: def run(self): tdSql.prepare() - - self.tmqCase1() + self.prepareTestEnv() + # self.tmqCase1() self.tmqCase2() def stop(self): diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 6639376485..441427847a 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -184,7 +184,7 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py -python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py +#python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py #python3 ./test.py -f 7-tmq/tmqDnodeRestart.py python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py