fix: timeseries statistics and test case

This commit is contained in:
kailixu 2025-02-13 18:55:28 +08:00
parent 61a606a410
commit e3aaab405d
4 changed files with 87 additions and 28 deletions

View File

@ -10,14 +10,16 @@
#include "meta.h" #include "meta.h"
extern SDmNotifyHandle dmNotifyHdl;
int32_t metaCloneEntry(const SMetaEntry *pEntry, SMetaEntry **ppEntry); int32_t metaCloneEntry(const SMetaEntry *pEntry, SMetaEntry **ppEntry);
void metaCloneEntryFree(SMetaEntry **ppEntry); void metaCloneEntryFree(SMetaEntry **ppEntry);
void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey); void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
void metaTimeSeriesNotifyCheck(SMeta *pMeta);
int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static void metaTimeSeriesNotifyCheck(SMeta *pMeta);
static int32_t metaGetChildUidsOfSuperTable(SMeta *pMeta, tb_uid_t suid, SArray **childList); static int32_t metaGetChildUidsOfSuperTable(SMeta *pMeta, tb_uid_t suid, SArray **childList);
static int32_t metaFetchTagIdxKey(SMeta *pMeta, const SMetaEntry *pEntry, const SSchema *pTagColumn, static int32_t metaFetchTagIdxKey(SMeta *pMeta, const SMetaEntry *pEntry, const SSchema *pTagColumn,
STagIdxKey **ppTagIdxKey, int32_t *pTagIdxKeySize); STagIdxKey **ppTagIdxKey, int32_t *pTagIdxKeySize);
@ -990,6 +992,20 @@ static int32_t metaTtlIdxDelete(SMeta *pMeta, const SMetaHandleParam *pParam) {
return code; return code;
} }
static void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
#if defined(TD_ENTERPRISE)
int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0);
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
if (deltaTS > tsTimeSeriesThreshold) {
if (0 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 2)) {
if (tsem_post(&dmNotifyHdl.sem) != 0) {
metaError("vgId:%d, failed to post semaphore, errno:%d", TD_VID(pMeta->pVnode), errno);
}
}
}
#endif
}
static int32_t (*metaTableOpFn[META_TABLE_MAX][META_TABLE_OP_MAX])(SMeta *pMeta, const SMetaHandleParam *pParam) = static int32_t (*metaTableOpFn[META_TABLE_MAX][META_TABLE_OP_MAX])(SMeta *pMeta, const SMetaHandleParam *pParam) =
{ {
[META_ENTRY_TABLE] = [META_ENTRY_TABLE] =
@ -1139,6 +1155,7 @@ static int32_t metaHandleNormalTableCreate(SMeta *pMeta, const SMetaEntry *pEntr
metaError("vgId:%d, failed to create table:%s since %s", TD_VID(pMeta->pVnode), pEntry->name, tstrerror(rc)); metaError("vgId:%d, failed to create table:%s since %s", TD_VID(pMeta->pVnode), pEntry->name, tstrerror(rc));
} }
} }
metaTimeSeriesNotifyCheck(pMeta);
} else { } else {
metaErr(TD_VID(pMeta->pVnode), code); metaErr(TD_VID(pMeta->pVnode), code);
} }
@ -1228,7 +1245,7 @@ static int32_t metaHandleChildTableCreate(SMeta *pMeta, const SMetaEntry *pEntry
} else { } else {
metaErr(TD_VID(pMeta->pVnode), code); metaErr(TD_VID(pMeta->pVnode), code);
} }
metaTimeSeriesNotifyCheck(pMeta);
metaFetchEntryFree(&pSuperEntry); metaFetchEntryFree(&pSuperEntry);
return code; return code;
} }
@ -1673,7 +1690,7 @@ static int32_t metaHandleSuperTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry
tsdbCacheInvalidateSchema(pTsdb, pEntry->uid, -1, pEntry->stbEntry.schemaRow.version); tsdbCacheInvalidateSchema(pTsdb, pEntry->uid, -1, pEntry->stbEntry.schemaRow.version);
} }
metaTimeSeriesNotifyCheck(pMeta);
metaFetchEntryFree(&pOldEntry); metaFetchEntryFree(&pOldEntry);
return code; return code;
} }

View File

@ -261,20 +261,6 @@ _exception:
return code; return code;
} }
void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
#if defined(TD_ENTERPRISE)
int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0);
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
if (deltaTS > tsTimeSeriesThreshold) {
if (0 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 2)) {
if (tsem_post(&dmNotifyHdl.sem) != 0) {
metaError("vgId:%d, failed to post semaphore, errno:%d", TD_VID(pMeta->pVnode), errno);
}
}
}
#endif
}
static int32_t metaDropTables(SMeta *pMeta, SArray *tbUids) { static int32_t metaDropTables(SMeta *pMeta, SArray *tbUids) {
int32_t code = 0; int32_t code = 0;
if (taosArrayGetSize(tbUids) == 0) return TSDB_CODE_SUCCESS; if (taosArrayGetSize(tbUids) == 0) return TSDB_CODE_SUCCESS;

View File

@ -378,10 +378,6 @@ static int32_t metaCreateChildTable(SMeta *pMeta, int64_t version, SVCreateTbReq
pReq->ctb.suid, version); pReq->ctb.suid, version);
} }
return code; return code;
#if 1
metaTimeSeriesNotifyCheck(pMeta);
#endif
} }
// Drop Child Table // Drop Child Table
@ -489,9 +485,6 @@ static int32_t metaCreateNormalTable(SMeta *pMeta, int64_t version, SVCreateTbRe
__func__, __FILE__, __LINE__, tstrerror(code), pReq->uid, pReq->name, version); __func__, __FILE__, __LINE__, tstrerror(code), pReq->uid, pReq->name, version);
} }
TAOS_RETURN(code); TAOS_RETURN(code);
#if 1
metaTimeSeriesNotifyCheck(pMeta);
#endif
} }
// Drop Normal Table // Drop Normal Table

View File

@ -135,8 +135,38 @@ class TDTestCase:
port = dnode.cfgDict["serverPort"] port = dnode.cfgDict["serverPort"]
config_dir = dnode.cfgDir config_dir = dnode.cfgDir
return taos.connect(host=host, port=int(port), config=config_dir) return taos.connect(host=host, port=int(port), config=config_dir)
def getShowGrantsTimeSeries(self, maxRetry=10):
for nRetry in range(maxRetry):
tdSql.query("show grants")
timeseries = tdSql.queryResult[0][5]
tdSql.query("show grants full")
full_timeseries = tdSql.queryResult[1][3]
if timeseries == full_timeseries:
return int(timeseries.split('/')[0])
else:
tdLog.info(f"timeseries: {timeseries}, != full_timeseries: {full_timeseries}, retry: {nRetry}")
time.sleep(1)
raise Exception("Timeseries not equal within {maxRetry} seconds")
def s1_check_alive(self): def getTablesTimeSeries(self):
tdSql.query(f"select cast(sum(columns-1) as int) as tss from information_schema.ins_tables where db_name not in ('information_schema', 'performance_schema', 'audit')")
return int(tdSql.queryResult[0][0])
def checkGrantsTimeSeries(self, prompt="", nExpectedTimeSeries=0, maxRetry=10):
for nRetry in range(maxRetry):
tss_grant = self.getShowGrantsTimeSeries()
if tss_grant == nExpectedTimeSeries:
tss_table = self.getTablesTimeSeries()
if tss_grant == tss_table:
tdLog.info(f"{prompt}: tss_grant: {tss_grant} == tss_table: {tss_table}")
return
else:
raise Exception(f"{prompt}: tss_grant: {tss_grant} != tss_table: {tss_table}")
time.sleep(1)
raise Exception(f"{prompt}: tss_grant: {tss_grant} != nExpectedTimeSeries: {nExpectedTimeSeries}")
def s1_check_timeseries(self):
# check cluster alive # check cluster alive
tdLog.printNoPrefix("======== test cluster alive: ") tdLog.printNoPrefix("======== test cluster alive: ")
tdSql.checkDataLoop(0, 0, 1, "show cluster alive;", 20, 0.5) tdSql.checkDataLoop(0, 0, 1, "show cluster alive;", 20, 0.5)
@ -144,6 +174,39 @@ class TDTestCase:
tdSql.query("show db.alive;") tdSql.query("show db.alive;")
tdSql.checkData(0, 0, 1) tdSql.checkData(0, 0, 1)
# check timeseries
tss_grant = 5
self.checkGrantsTimeSeries("initial check", tss_grant)
tdSql.execute("create database if not exists db100")
tdSql.execute("create table db100.stb100(ts timestamp, c0 int,c1 bigint,c2 int,c3 float,c4 double) tags(t0 bigint unsigned)")
tdSql.execute("create table db100.ctb100 using db100.stb100 tags(100)")
tdSql.execute("create table db100.ctb101 using db100.stb100 tags(101)")
tdSql.execute("create table db100.ntb100 (ts timestamp, c0 int,c1 bigint,c2 int,c3 float,c4 double)")
tdSql.execute("create table db100.ntb101 (ts timestamp, c0 int,c1 bigint,c2 int,c3 float,c4 double)")
tss_grant += 20
self.checkGrantsTimeSeries("create tables and check", tss_grant)
tdSql.execute("alter table db100.stb100 add column c5 int")
tdSql.execute("alter stable db100.stb100 add column c6 int")
tss_grant += 4
self.checkGrantsTimeSeries("alter table column/tag and tss_grant", tss_grant)
# tdSql.execute("alter table db100.stb100 add tag t1 int")
# tdSql.execute("create table db100.ctb102 using db100.stb100 tags(102, 102)")
# tdSql.execute("alter table db100.ctb100 set tag t0=1000")
# tdSql.execute("alter table db100.ntb100 add column c5 int")
# tss_grant += 12
# self.checkGrantsTimeSeries("alter table column/tag and tss_grant", tss_grant)
# tdSql.execute("drop table db100.ctb100")
# tdSql.execute("drop table db100.ntb100")
# tss_grant -= 13
# self.checkGrantsTimeSeries("drop ctb/ntb and check: tss_grant", tss_grant)
# tdSql.execute("drop table db100.stb100")
# tss_grant -= 14
# self.checkGrantsTimeSeries("drop stb and check: tss_grant", tss_grant)
# tdSql.execute("drop database db100")
# tss_grant -= 7
# self.checkGrantsTimeSeries("drop database and check: tss_grant", tss_grant)
def s2_check_show_grants_ungranted(self): def s2_check_show_grants_ungranted(self):
tdLog.printNoPrefix("======== test show grants ungranted: ") tdLog.printNoPrefix("======== test show grants ungranted: ")
self.infoPath = os.path.join(self.workPath, ".clusterInfo") self.infoPath = os.path.join(self.workPath, ".clusterInfo")
@ -221,9 +284,9 @@ class TDTestCase:
# print(self.master_dnode.cfgDict) # print(self.master_dnode.cfgDict)
# keep the order of following steps # keep the order of following steps
self.s0_five_dnode_one_mnode() self.s0_five_dnode_one_mnode()
self.s1_check_alive() self.s1_check_timeseries()
self.s2_check_show_grants_ungranted() # self.s2_check_show_grants_ungranted()
self.s3_check_show_grants_granted() # self.s3_check_show_grants_granted()
def stop(self): def stop(self):