Merge pull request #29784 from taosdata/fix/TS-6017-3.0m
fix: timeseries statistics and test case
This commit is contained in:
commit
6dedab4bba
|
@ -402,6 +402,7 @@ int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
|
|||
|
||||
if (*ppEntry) { // update
|
||||
(*ppEntry)->info.ctbNum = pInfo->ctbNum;
|
||||
(*ppEntry)->info.colNum = pInfo->colNum;
|
||||
} else { // insert
|
||||
if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
|
||||
TAOS_UNUSED(metaRehashStatsCache(pCache, 1));
|
||||
|
|
|
@ -10,14 +10,16 @@
|
|||
|
||||
#include "meta.h"
|
||||
|
||||
extern SDmNotifyHandle dmNotifyHdl;
|
||||
|
||||
int32_t metaCloneEntry(const SMetaEntry *pEntry, SMetaEntry **ppEntry);
|
||||
void metaCloneEntryFree(SMetaEntry **ppEntry);
|
||||
void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
|
||||
int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
||||
int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
||||
void metaTimeSeriesNotifyCheck(SMeta *pMeta);
|
||||
int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
|
||||
static void metaTimeSeriesNotifyCheck(SMeta *pMeta);
|
||||
static int32_t metaGetChildUidsOfSuperTable(SMeta *pMeta, tb_uid_t suid, SArray **childList);
|
||||
static int32_t metaFetchTagIdxKey(SMeta *pMeta, const SMetaEntry *pEntry, const SSchema *pTagColumn,
|
||||
STagIdxKey **ppTagIdxKey, int32_t *pTagIdxKeySize);
|
||||
|
@ -990,6 +992,20 @@ static int32_t metaTtlIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
|
||||
#if defined(TD_ENTERPRISE)
|
||||
int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0);
|
||||
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
|
||||
if (deltaTS > tsTimeSeriesThreshold) {
|
||||
if (0 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 2)) {
|
||||
if (tsem_post(&dmNotifyHdl.sem) != 0) {
|
||||
metaError("vgId:%d, failed to post semaphore, errno:%d", TD_VID(pMeta->pVnode), errno);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static int32_t (*metaTableOpFn[META_TABLE_MAX][META_TABLE_OP_MAX])(SMeta *pMeta, const SMetaHandleParam *pParam) =
|
||||
{
|
||||
[META_ENTRY_TABLE] =
|
||||
|
@ -1139,6 +1155,7 @@ static int32_t metaHandleNormalTableCreate(SMeta *pMeta, const SMetaEntry *pEntr
|
|||
metaError("vgId:%d, failed to create table:%s since %s", TD_VID(pMeta->pVnode), pEntry->name, tstrerror(rc));
|
||||
}
|
||||
}
|
||||
metaTimeSeriesNotifyCheck(pMeta);
|
||||
} else {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
}
|
||||
|
@ -1214,7 +1231,7 @@ static int32_t metaHandleChildTableCreate(SMeta *pMeta, const SMetaEntry *pEntry
|
|||
if (ret < 0) {
|
||||
metaErr(TD_VID(pMeta->pVnode), ret);
|
||||
}
|
||||
pMeta->pVnode->config.vndStats.numOfNTimeSeries += (nCols - 1);
|
||||
pMeta->pVnode->config.vndStats.numOfTimeSeries += (nCols > 0 ? nCols - 1 : 0);
|
||||
}
|
||||
|
||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||
|
@ -1228,7 +1245,7 @@ static int32_t metaHandleChildTableCreate(SMeta *pMeta, const SMetaEntry *pEntry
|
|||
} else {
|
||||
metaErr(TD_VID(pMeta->pVnode), code);
|
||||
}
|
||||
|
||||
metaTimeSeriesNotifyCheck(pMeta);
|
||||
metaFetchEntryFree(&pSuperEntry);
|
||||
return code;
|
||||
}
|
||||
|
@ -1595,6 +1612,10 @@ static int32_t metaHandleSuperTableUpdateImpl(SMeta *pMeta, SMetaHandleParam *pP
|
|||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
metaUpdateStbStats(pMeta, pEntry->uid, 0, pEntry->stbEntry.schemaRow.nCols - pOldEntry->stbEntry.schemaRow.nCols);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1673,7 +1694,16 @@ static int32_t metaHandleSuperTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry
|
|||
|
||||
tsdbCacheInvalidateSchema(pTsdb, pEntry->uid, -1, pEntry->stbEntry.schemaRow.version);
|
||||
}
|
||||
|
||||
if (updStat) {
|
||||
int64_t ctbNum = 0;
|
||||
int32_t ret = metaGetStbStats(pMeta->pVnode, pEntry->uid, &ctbNum, NULL);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to get stb stats:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pEntry->name,
|
||||
pEntry->uid, tstrerror(ret));
|
||||
}
|
||||
pMeta->pVnode->config.vndStats.numOfTimeSeries += (ctbNum * deltaCol);
|
||||
if (deltaCol > 0) metaTimeSeriesNotifyCheck(pMeta);
|
||||
}
|
||||
metaFetchEntryFree(&pOldEntry);
|
||||
return code;
|
||||
}
|
||||
|
@ -1772,7 +1802,9 @@ static int32_t metaHandleNormalTableUpdate(SMeta *pMeta, const SMetaEntry *pEntr
|
|||
#endif
|
||||
tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, pEntry->uid, pEntry->ntbEntry.schemaRow.version);
|
||||
}
|
||||
metaTimeSeriesNotifyCheck(pMeta);
|
||||
int32_t deltaCol = pEntry->ntbEntry.schemaRow.nCols - pOldEntry->ntbEntry.schemaRow.nCols;
|
||||
pMeta->pVnode->config.vndStats.numOfNTimeSeries += deltaCol;
|
||||
if (deltaCol > 0) metaTimeSeriesNotifyCheck(pMeta);
|
||||
metaFetchEntryFree(&pOldEntry);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -261,20 +261,6 @@ _exception:
|
|||
return code;
|
||||
}
|
||||
|
||||
void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
|
||||
#if defined(TD_ENTERPRISE)
|
||||
int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0);
|
||||
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
|
||||
if (deltaTS > tsTimeSeriesThreshold) {
|
||||
if (0 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 2)) {
|
||||
if (tsem_post(&dmNotifyHdl.sem) != 0) {
|
||||
metaError("vgId:%d, failed to post semaphore, errno:%d", TD_VID(pMeta->pVnode), errno);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static int32_t metaDropTables(SMeta *pMeta, SArray *tbUids) {
|
||||
int32_t code = 0;
|
||||
if (taosArrayGetSize(tbUids) == 0) return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -378,10 +378,6 @@ static int32_t metaCreateChildTable(SMeta *pMeta, int64_t version, SVCreateTbReq
|
|||
pReq->ctb.suid, version);
|
||||
}
|
||||
return code;
|
||||
|
||||
#if 0
|
||||
metaTimeSeriesNotifyCheck(pMeta);
|
||||
#endif
|
||||
}
|
||||
|
||||
// Drop Child Table
|
||||
|
@ -489,9 +485,6 @@ static int32_t metaCreateNormalTable(SMeta *pMeta, int64_t version, SVCreateTbRe
|
|||
__func__, __FILE__, __LINE__, tstrerror(code), pReq->uid, pReq->name, version);
|
||||
}
|
||||
TAOS_RETURN(code);
|
||||
#if 0
|
||||
metaTimeSeriesNotifyCheck(pMeta);
|
||||
#endif
|
||||
}
|
||||
|
||||
// Drop Normal Table
|
||||
|
|
|
@ -135,8 +135,38 @@ class TDTestCase:
|
|||
port = dnode.cfgDict["serverPort"]
|
||||
config_dir = dnode.cfgDir
|
||||
return taos.connect(host=host, port=int(port), config=config_dir)
|
||||
|
||||
def getShowGrantsTimeSeries(self, maxRetry=10):
|
||||
for nRetry in range(maxRetry):
|
||||
tdSql.query("show grants")
|
||||
timeseries = tdSql.queryResult[0][5]
|
||||
tdSql.query("show grants full")
|
||||
full_timeseries = tdSql.queryResult[1][3]
|
||||
if timeseries == full_timeseries:
|
||||
return int(timeseries.split('/')[0])
|
||||
else:
|
||||
tdLog.info(f"timeseries: {timeseries}, != full_timeseries: {full_timeseries}, retry: {nRetry}")
|
||||
time.sleep(1)
|
||||
raise Exception("Timeseries not equal within {maxRetry} seconds")
|
||||
|
||||
def s1_check_alive(self):
|
||||
def getTablesTimeSeries(self):
|
||||
tdSql.query(f"select cast(sum(columns-1) as int) as tss from information_schema.ins_tables where db_name not in ('information_schema', 'performance_schema', 'audit')")
|
||||
return int(tdSql.queryResult[0][0])
|
||||
|
||||
def checkGrantsTimeSeries(self, prompt="", nExpectedTimeSeries=0, maxRetry=10):
|
||||
for nRetry in range(maxRetry):
|
||||
tss_grant = self.getShowGrantsTimeSeries()
|
||||
if tss_grant == nExpectedTimeSeries:
|
||||
tss_table = self.getTablesTimeSeries()
|
||||
if tss_grant == tss_table:
|
||||
tdLog.info(f"{prompt}: tss_grant: {tss_grant} == tss_table: {tss_table}")
|
||||
return
|
||||
else:
|
||||
raise Exception(f"{prompt}: tss_grant: {tss_grant} != tss_table: {tss_table}")
|
||||
time.sleep(1)
|
||||
raise Exception(f"{prompt}: tss_grant: {tss_grant} != nExpectedTimeSeries: {nExpectedTimeSeries}")
|
||||
|
||||
def s1_check_timeseries(self):
|
||||
# check cluster alive
|
||||
tdLog.printNoPrefix("======== test cluster alive: ")
|
||||
tdSql.checkDataLoop(0, 0, 1, "show cluster alive;", 20, 0.5)
|
||||
|
@ -144,6 +174,46 @@ class TDTestCase:
|
|||
tdSql.query("show db.alive;")
|
||||
tdSql.checkData(0, 0, 1)
|
||||
|
||||
# check timeseries
|
||||
tss_grant = 5
|
||||
for i in range(0, 3):
|
||||
tdLog.printNoPrefix(f"======== test timeseries: loop{i}")
|
||||
self.checkGrantsTimeSeries("initial check", tss_grant)
|
||||
tdSql.execute("create database if not exists db100")
|
||||
tdSql.execute("create table db100.stb100(ts timestamp, c0 int,c1 bigint,c2 int,c3 float,c4 double) tags(t0 bigint unsigned)")
|
||||
tdSql.execute("create table db100.ctb100 using db100.stb100 tags(100)")
|
||||
tdSql.execute("create table db100.ctb101 using db100.stb100 tags(101)")
|
||||
tdSql.execute("create table db100.ntb100 (ts timestamp, c0 int,c1 bigint,c2 int,c3 float,c4 double)")
|
||||
tdSql.execute("create table db100.ntb101 (ts timestamp, c0 int,c1 bigint,c2 int,c3 float,c4 double)")
|
||||
tss_grant += 20
|
||||
self.checkGrantsTimeSeries("create tables and check", tss_grant)
|
||||
tdSql.execute("alter table db100.stb100 add column c5 int")
|
||||
tdSql.execute("alter stable db100.stb100 add column c6 int")
|
||||
tdSql.execute("alter table db100.stb100 add tag t1 int")
|
||||
tss_grant += 4
|
||||
self.checkGrantsTimeSeries("add stable column and check", tss_grant)
|
||||
tdSql.execute("create table db100.ctb102 using db100.stb100 tags(102, 102)")
|
||||
tdSql.execute("alter table db100.ctb100 set tag t0=1000")
|
||||
tdSql.execute("alter table db100.ntb100 add column c5 int")
|
||||
tss_grant += 8
|
||||
self.checkGrantsTimeSeries("add ntable column and check", tss_grant)
|
||||
tdSql.execute("alter table db100.stb100 drop column c5")
|
||||
tdSql.execute("alter table db100.stb100 drop tag t1")
|
||||
tdSql.execute("alter table db100.ntb100 drop column c0")
|
||||
tdSql.execute("alter table db100.stb100 drop column c0")
|
||||
tss_grant -= 7
|
||||
self.checkGrantsTimeSeries("drop stb/ntb column and check", tss_grant)
|
||||
tdSql.execute("drop table db100.ctb100")
|
||||
tdSql.execute("drop table db100.ntb100")
|
||||
tss_grant -= 10
|
||||
self.checkGrantsTimeSeries("drop ctb/ntb and check", tss_grant)
|
||||
tdSql.execute("drop table db100.stb100")
|
||||
tss_grant -= 10
|
||||
self.checkGrantsTimeSeries("drop stb and check", tss_grant)
|
||||
tdSql.execute("drop database db100")
|
||||
tss_grant -= 5
|
||||
self.checkGrantsTimeSeries("drop database and check", tss_grant)
|
||||
|
||||
def s2_check_show_grants_ungranted(self):
|
||||
tdLog.printNoPrefix("======== test show grants ungranted: ")
|
||||
self.infoPath = os.path.join(self.workPath, ".clusterInfo")
|
||||
|
@ -221,7 +291,7 @@ class TDTestCase:
|
|||
# print(self.master_dnode.cfgDict)
|
||||
# keep the order of following steps
|
||||
self.s0_five_dnode_one_mnode()
|
||||
self.s1_check_alive()
|
||||
self.s1_check_timeseries()
|
||||
self.s2_check_show_grants_ungranted()
|
||||
self.s3_check_show_grants_granted()
|
||||
|
||||
|
|
Loading…
Reference in New Issue