From 53a9459c206f4ad70ff205dc87646e1328de2796 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sun, 24 Sep 2023 07:52:27 +0800 Subject: [PATCH] enh: update timeseries --- include/common/tgrant.h | 2 + include/dnode/mgmt/dnode.h | 1 - source/dnode/mnode/impl/inc/mndGrant.h | 2 - source/dnode/mnode/impl/src/mndDnode.c | 5 ++ source/dnode/mnode/impl/src/mndMain.c | 11 ++-- source/dnode/vnode/src/inc/meta.h | 2 +- source/dnode/vnode/src/meta/metaQuery.c | 31 +++++---- source/dnode/vnode/src/meta/metaTable.c | 85 +++++++++++++++++-------- source/dnode/vnode/src/vnd/vnodeQuery.c | 2 +- 9 files changed, 88 insertions(+), 53 deletions(-) diff --git a/include/common/tgrant.h b/include/common/tgrant.h index 46e09a56b6..31d34add24 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -30,6 +30,8 @@ extern "C" { #define GRANTS_COL_MAX_LEN 196 #endif +#define GRANT_HEART_BEAT_MIN 2 + typedef enum { TSDB_GRANT_ALL, TSDB_GRANT_TIME, diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index c9f2339d11..82823e3f57 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -44,7 +44,6 @@ int32_t dmRun(); */ void dmStop(); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndGrant.h b/source/dnode/mnode/impl/inc/mndGrant.h index a036bf6d7e..88f118cb8f 100644 --- a/source/dnode/mnode/impl/inc/mndGrant.h +++ b/source/dnode/mnode/impl/inc/mndGrant.h @@ -22,8 +22,6 @@ #include "mndInt.h" -#define GRANT_HB_INTERVAL 300 // 300 seconds - int32_t mndInitGrant(SMnode * pMnode); void mndCleanupGrant(); void grantParseParameter(); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 9c7d8ee90f..95a8f873cb 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -56,6 +56,7 @@ enum { DND_ADD, DND_DROP, }; + static int32_t mndCreateDefaultDnode(SMnode *pMnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); @@ -550,6 +551,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq); + if (reboot) { + tsGrantHBInterval = GRANT_HEART_BEAT_MIN; + } + for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) { SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index e00af5375d..1c87cde78a 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -262,13 +262,6 @@ static void *mndThreadFp(void *param) { lastTime++; taosMsleep(100); if (mndGetStop(pMnode)) break; - - if (atomic_val_compare_exchange_32(&tsGrantHBInterval, -GRANT_HB_INTERVAL, GRANT_HB_INTERVAL) == - -GRANT_HB_INTERVAL || - (lastTime % (tsGrantHBInterval * 10) == 0)) { - mndPullupGrant(pMnode); - } - if (lastTime % 10 != 0) continue; int64_t sec = lastTime / 10; @@ -300,6 +293,10 @@ static void *mndThreadFp(void *param) { mndPullupTelem(pMnode); } + if (sec % tsGrantHBInterval == 0) { + mndPullupGrant(pMnode); + } + if (sec % tsUptimeInterval == 0) { mndIncreaseUpTime(pMnode); } diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index b57e046ad8..c74ccf6c11 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -71,7 +71,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid); int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo); int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid); int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); -void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t deltaCtb, int32_t totalCols); +void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol); int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, LRUHandle** pHandle); struct SMeta { diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 1a9001ce2b..c9ce2a51b8 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -697,24 +697,23 @@ int64_t metaGetTbNum(SMeta *pMeta) { } void metaUpdTimeSeriesNum(SMeta *pMeta) { - SVnodeStats *pStats = &pMeta->pVnode->config.vndStats; - int64_t nCtbTimeSeries = 0; - - vnodeGetTimeSeriesNum(pMeta->pVnode, &nCtbTimeSeries); - atomic_store_64(&pStats->numOfTimeSeries, nCtbTimeSeries); + int64_t nCtbTimeSeries = 0; + if (vnodeGetTimeSeriesNum(pMeta->pVnode, &nCtbTimeSeries) == 0) { + atomic_store_64(&pMeta->pVnode->config.vndStats.numOfTimeSeries, nCtbTimeSeries); + } } -static int64_t metaGetTimeSeriesNumImpl(SMeta *pMeta, bool forceUpd) { +static FORCE_INLINE metaGetTimeSeriesNumImpl(SMeta *pMeta, bool forceUpd) { // sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column) SVnodeStats *pStats = &pMeta->pVnode->config.vndStats; - if (forceUpd || pStats->numOfTimeSeries < 0) { + if (forceUpd || pStats->numOfTimeSeries <= 0) { metaUpdTimeSeriesNum(pMeta); } return pStats->numOfTimeSeries + pStats->numOfNTimeSeries; } -// type: 1 report timeseries +// type: 1 reported timeseries int64_t metaGetTimeSeriesNum(SMeta *pMeta, int type) { int64_t nTimeSeries = metaGetTimeSeriesNumImpl(pMeta, false); if (type == 1) { @@ -1532,18 +1531,19 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t metaULock(pVnodeObj->pMeta); if (numOfTables) *numOfTables = state.ctbNum; if (numOfCols) *numOfCols = state.colNum; + assert(state.colNum > 0); + assert(state.ctbNum >= 0); goto _exit; } // slow path: search TDB int64_t ctbNum = 0; - vnodeGetCtbNum(pVnode, uid, &ctbNum); - - metaULock(pVnodeObj->pMeta); - if (numOfTables) *numOfTables = ctbNum; - int32_t colNum = 0; + vnodeGetCtbNum(pVnode, uid, &ctbNum); vnodeGetStbColumnNum(pVnode, uid, &colNum); + metaULock(pVnodeObj->pMeta); + + if (numOfTables) *numOfTables = ctbNum; if (numOfCols) *numOfCols = colNum; state.uid = uid; @@ -1559,13 +1559,12 @@ _exit: return code; } -void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t deltaCtb, int32_t totalCols) { +void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol) { SMetaStbStats stats = {0}; if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) { stats.ctbNum += deltaCtb; - if (totalCols > 0) stats.colNum = totalCols; - + stats.colNum += deltaCol; metaStatsCacheUpsert(pMeta, &stats); } } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index c10811083f..e95c11b0d9 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -28,7 +28,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry); -static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type); +static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid); static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey); // opt ins_tables query static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME); @@ -198,9 +198,7 @@ static inline void metaTimeSeriesNotifyCheck(SMeta *pMeta) { #ifdef TD_ENTERPRISE int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0); int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries; - if (abs(deltaTS) > tsTimeSeriesThreshold) { - tsem_post(&dmNotifySem); - } + if (deltaTS > tsTimeSeriesThreshold) tsem_post(&dmNotifySem); #endif } @@ -305,7 +303,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) { tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild); - metaDropTableByUid(pMeta, uid, NULL); + metaDropTableByUid(pMeta, uid, NULL, NULL); } // drop super table @@ -319,6 +317,8 @@ _drop_super_table: metaULock(pMeta); + metaUpdTimeSeriesNum(pMeta); + _exit: tdbFree(pKey); tdbFree(pData); @@ -403,7 +403,13 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { // metaStatsCacheDrop(pMeta, nStbEntry.uid); - metaUpdateStbStats(pMeta, pReq->suid, 0, pReq->schemaRow.nCols); + int32_t deltaCol = pReq->schemaRow.nCols - oStbEntry.stbEntry.schemaRow.nCols; + if (deltaCol != 0) { + metaUpdateStbStats(pMeta, pReq->suid, 0, deltaCol); + int64_t ctbNum; + metaGetStbStats(pMeta, pReq->suid, &ctbNum, NULL); + pMeta->pVnode->config.vndStats.numOfTimeSeries += (ctbNum * deltaCol); + } metaULock(pMeta); @@ -784,10 +790,9 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs #endif ++pStats->numOfCTables; - int32_t nCols = 0; metaGetStbStats(pMeta->pVnode, me.ctbEntry.suid, 0, &nCols); - pStats->numOfTimeSeries = pStats->numOfCTables * (nCols - 1); + pStats->numOfTimeSeries += nCols - 1; metaWLock(pMeta); metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1, 0); @@ -840,6 +845,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi int nData = 0; int rc = 0; tb_uid_t uid; + tb_uid_t suid; int type; rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData); @@ -850,9 +856,19 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi uid = *(tb_uid_t *)pData; metaWLock(pMeta); - metaDropTableByUid(pMeta, uid, &type); + rc = metaDropTableByUid(pMeta, uid, &type, &suid); metaULock(pMeta); + if (rc < 0) goto _exit; + + if (type == TSDB_CHILD_TABLE) { + int32_t nCols = 0; + SVnodeStats *pStats = &pMeta->pVnode->config.vndStats; + if (metaGetStbStats(pMeta->pVnode, suid, NULL, &nCols) == 0) { + pStats->numOfTimeSeries -= nCols - 1; + } + } + if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) { taosArrayPush(tbUids, &uid); } @@ -861,20 +877,46 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi *tbUid = uid; } +_exit: tdbFree(pData); - return 0; + return rc; } void metaDropTables(SMeta *pMeta, SArray *tbUids) { if (taosArrayGetSize(tbUids) == 0) return; + int64_t ctbNum = 0; + SSHashObj *suidHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + metaWLock(pMeta); for (int i = 0; i < taosArrayGetSize(tbUids); ++i) { tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i); - metaDropTableByUid(pMeta, uid, NULL); + tb_uid_t suid = 0; + metaDropTableByUid(pMeta, uid, NULL, &suid); + if (suid != 0 && suidHash) { + int64_t *cnt = tSimpleHashGet(suidHash, &suid, sizeof(tb_uid_t)); + if (cnt) { + ctbNum = *cnt + 1; + } else { + ctbNum = 1; + } + tSimpleHashPut(suidHash, &suid, sizeof(tb_uid_t), &ctbNum, sizeof(int64_t)); + } metaDebug("batch drop table:%" PRId64, uid); } metaULock(pMeta); + + // update timeseries + void *pCtbNum = NULL; + int32_t iter = 0; + while ((pCtbNum = tSimpleHashIterate(suidHash, pCtbNum, &iter))) { + tb_uid_t *pSuid = tSimpleHashGetKey(pCtbNum, NULL); + int32_t nCols = 0; + SVnodeStats *pStats = &pMeta->pVnode->config.vndStats; + if (metaGetStbStats(pMeta->pVnode, *pSuid, NULL, &nCols) == 0) { + pStats->numOfTimeSeries -= *(int64_t *)pCtbNum * (nCols - 1); + } + } } static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) { @@ -1009,7 +1051,7 @@ static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) { return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx); } -static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { +static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid) { void *pData = NULL; int nData = 0; int rc = 0; @@ -1034,9 +1076,11 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { if (type) *type = e.type; if (e.type == TSDB_CHILD_TABLE) { + if (pSuid) *pSuid = e.ctbEntry.suid; void *tData = NULL; int tLen = 0; + if (tdbTbGet(pMeta->pUidIdx, &e.ctbEntry.suid, sizeof(tb_uid_t), &tData, &tLen) == 0) { STbDbKey tbDbKey = {.uid = e.ctbEntry.suid, .version = ((SUidIdxVal *)tData)[0].version}; if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &tData, &tLen) == 0) { @@ -1093,23 +1137,18 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { if (e.type != TSDB_SUPER_TABLE) metaDeleteTtl(pMeta, &e); - SVnodeStats *pStats = &pMeta->pVnode->config.vndStats; if (e.type == TSDB_CHILD_TABLE) { tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn); - --pStats->numOfCTables; - int32_t nCols = 0; - metaGetStbStats(pMeta->pVnode, e.ctbEntry.suid, 0, &nCols); - pStats->numOfTimeSeries = pStats->numOfCTables * (nCols - 1); - + --pMeta->pVnode->config.vndStats.numOfCTables; metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0); metaUidCacheClear(pMeta, e.ctbEntry.suid); metaTbGroupCacheClear(pMeta, e.ctbEntry.suid); } else if (e.type == TSDB_NORMAL_TABLE) { // drop schema.db (todo) - --pStats->numOfNTables; - pStats->numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1; + --pMeta->pVnode->config.vndStats.numOfNTables; + pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1; } else if (e.type == TSDB_SUPER_TABLE) { tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn); // drop schema.db (todo) @@ -1117,14 +1156,11 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { metaStatsCacheDrop(pMeta, uid); metaUidCacheClear(pMeta, uid); metaTbGroupCacheClear(pMeta, uid); - metaUpdTimeSeriesNum(pMeta); - --pStats->numOfSTables; + --pMeta->pVnode->config.vndStats.numOfSTables; } metaCacheDrop(pMeta, uid); - metaTimeSeriesNotifyCheck(pMeta); - tDecoderClear(&dc); tdbFree(pData); @@ -1307,7 +1343,6 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl pSchema->nCols--; --pMeta->pVnode->config.vndStats.numOfNTimeSeries; - metaTimeSeriesNotifyCheck(pMeta); break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: if (pColumn == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index c8b2a797fa..01292f33e4 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -554,7 +554,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { } int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) { - SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 1); + SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0); if (pSW) { *num = pSW->nCols; tDeleteSchemaWrapper(pSW);