enh: update timeseries

This commit is contained in:
kailixu 2023-09-24 07:52:27 +08:00
parent bc8ebf1562
commit 53a9459c20
9 changed files with 88 additions and 53 deletions

View File

@ -30,6 +30,8 @@ extern "C" {
#define GRANTS_COL_MAX_LEN 196
#endif
#define GRANT_HEART_BEAT_MIN 2
typedef enum {
TSDB_GRANT_ALL,
TSDB_GRANT_TIME,

View File

@ -44,7 +44,6 @@ int32_t dmRun();
*/
void dmStop();
#ifdef __cplusplus
}
#endif

View File

@ -22,8 +22,6 @@
#include "mndInt.h"
#define GRANT_HB_INTERVAL 300 // 300 seconds
int32_t mndInitGrant(SMnode * pMnode);
void mndCleanupGrant();
void grantParseParameter();

View File

@ -56,6 +56,7 @@ enum {
DND_ADD,
DND_DROP,
};
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
@ -550,6 +551,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
if (reboot) {
tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
}
for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);

View File

@ -262,13 +262,6 @@ static void *mndThreadFp(void *param) {
lastTime++;
taosMsleep(100);
if (mndGetStop(pMnode)) break;
if (atomic_val_compare_exchange_32(&tsGrantHBInterval, -GRANT_HB_INTERVAL, GRANT_HB_INTERVAL) ==
-GRANT_HB_INTERVAL ||
(lastTime % (tsGrantHBInterval * 10) == 0)) {
mndPullupGrant(pMnode);
}
if (lastTime % 10 != 0) continue;
int64_t sec = lastTime / 10;
@ -300,6 +293,10 @@ static void *mndThreadFp(void *param) {
mndPullupTelem(pMnode);
}
if (sec % tsGrantHBInterval == 0) {
mndPullupGrant(pMnode);
}
if (sec % tsUptimeInterval == 0) {
mndIncreaseUpTime(pMnode);
}

View File

@ -71,7 +71,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo);
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t deltaCtb, int32_t totalCols);
void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol);
int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, LRUHandle** pHandle);
struct SMeta {

View File

@ -697,24 +697,23 @@ int64_t metaGetTbNum(SMeta *pMeta) {
}
void metaUpdTimeSeriesNum(SMeta *pMeta) {
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
int64_t nCtbTimeSeries = 0;
vnodeGetTimeSeriesNum(pMeta->pVnode, &nCtbTimeSeries);
atomic_store_64(&pStats->numOfTimeSeries, nCtbTimeSeries);
if (vnodeGetTimeSeriesNum(pMeta->pVnode, &nCtbTimeSeries) == 0) {
atomic_store_64(&pMeta->pVnode->config.vndStats.numOfTimeSeries, nCtbTimeSeries);
}
}
static int64_t metaGetTimeSeriesNumImpl(SMeta *pMeta, bool forceUpd) {
static FORCE_INLINE metaGetTimeSeriesNumImpl(SMeta *pMeta, bool forceUpd) {
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
if (forceUpd || pStats->numOfTimeSeries < 0) {
if (forceUpd || pStats->numOfTimeSeries <= 0) {
metaUpdTimeSeriesNum(pMeta);
}
return pStats->numOfTimeSeries + pStats->numOfNTimeSeries;
}
// type: 1 report timeseries
// type: 1 reported timeseries
int64_t metaGetTimeSeriesNum(SMeta *pMeta, int type) {
int64_t nTimeSeries = metaGetTimeSeriesNumImpl(pMeta, false);
if (type == 1) {
@ -1532,18 +1531,19 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t
metaULock(pVnodeObj->pMeta);
if (numOfTables) *numOfTables = state.ctbNum;
if (numOfCols) *numOfCols = state.colNum;
assert(state.colNum > 0);
assert(state.ctbNum >= 0);
goto _exit;
}
// slow path: search TDB
int64_t ctbNum = 0;
vnodeGetCtbNum(pVnode, uid, &ctbNum);
metaULock(pVnodeObj->pMeta);
if (numOfTables) *numOfTables = ctbNum;
int32_t colNum = 0;
vnodeGetCtbNum(pVnode, uid, &ctbNum);
vnodeGetStbColumnNum(pVnode, uid, &colNum);
metaULock(pVnodeObj->pMeta);
if (numOfTables) *numOfTables = ctbNum;
if (numOfCols) *numOfCols = colNum;
state.uid = uid;
@ -1559,13 +1559,12 @@ _exit:
return code;
}
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t deltaCtb, int32_t totalCols) {
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol) {
SMetaStbStats stats = {0};
if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
stats.ctbNum += deltaCtb;
if (totalCols > 0) stats.colNum = totalCols;
stats.colNum += deltaCol;
metaStatsCacheUpsert(pMeta, &stats);
}
}

View File

@ -28,7 +28,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid);
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
// opt ins_tables query
static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
@ -198,9 +198,7 @@ static inline void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
#ifdef TD_ENTERPRISE
int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0);
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
if (abs(deltaTS) > tsTimeSeriesThreshold) {
tsem_post(&dmNotifySem);
}
if (deltaTS > tsTimeSeriesThreshold) tsem_post(&dmNotifySem);
#endif
}
@ -305,7 +303,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild);
metaDropTableByUid(pMeta, uid, NULL);
metaDropTableByUid(pMeta, uid, NULL, NULL);
}
// drop super table
@ -319,6 +317,8 @@ _drop_super_table:
metaULock(pMeta);
metaUpdTimeSeriesNum(pMeta);
_exit:
tdbFree(pKey);
tdbFree(pData);
@ -403,7 +403,13 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
// metaStatsCacheDrop(pMeta, nStbEntry.uid);
metaUpdateStbStats(pMeta, pReq->suid, 0, pReq->schemaRow.nCols);
int32_t deltaCol = pReq->schemaRow.nCols - oStbEntry.stbEntry.schemaRow.nCols;
if (deltaCol != 0) {
metaUpdateStbStats(pMeta, pReq->suid, 0, deltaCol);
int64_t ctbNum;
metaGetStbStats(pMeta, pReq->suid, &ctbNum, NULL);
pMeta->pVnode->config.vndStats.numOfTimeSeries += (ctbNum * deltaCol);
}
metaULock(pMeta);
@ -784,10 +790,9 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
#endif
++pStats->numOfCTables;
int32_t nCols = 0;
metaGetStbStats(pMeta->pVnode, me.ctbEntry.suid, 0, &nCols);
pStats->numOfTimeSeries = pStats->numOfCTables * (nCols - 1);
pStats->numOfTimeSeries += nCols - 1;
metaWLock(pMeta);
metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1, 0);
@ -840,6 +845,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
int nData = 0;
int rc = 0;
tb_uid_t uid;
tb_uid_t suid;
int type;
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
@ -850,9 +856,19 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
uid = *(tb_uid_t *)pData;
metaWLock(pMeta);
metaDropTableByUid(pMeta, uid, &type);
rc = metaDropTableByUid(pMeta, uid, &type, &suid);
metaULock(pMeta);
if (rc < 0) goto _exit;
if (type == TSDB_CHILD_TABLE) {
int32_t nCols = 0;
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
if (metaGetStbStats(pMeta->pVnode, suid, NULL, &nCols) == 0) {
pStats->numOfTimeSeries -= nCols - 1;
}
}
if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) {
taosArrayPush(tbUids, &uid);
}
@ -861,20 +877,46 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
*tbUid = uid;
}
_exit:
tdbFree(pData);
return 0;
return rc;
}
void metaDropTables(SMeta *pMeta, SArray *tbUids) {
if (taosArrayGetSize(tbUids) == 0) return;
int64_t ctbNum = 0;
SSHashObj *suidHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
metaWLock(pMeta);
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
metaDropTableByUid(pMeta, uid, NULL);
tb_uid_t suid = 0;
metaDropTableByUid(pMeta, uid, NULL, &suid);
if (suid != 0 && suidHash) {
int64_t *cnt = tSimpleHashGet(suidHash, &suid, sizeof(tb_uid_t));
if (cnt) {
ctbNum = *cnt + 1;
} else {
ctbNum = 1;
}
tSimpleHashPut(suidHash, &suid, sizeof(tb_uid_t), &ctbNum, sizeof(int64_t));
}
metaDebug("batch drop table:%" PRId64, uid);
}
metaULock(pMeta);
// update timeseries
void *pCtbNum = NULL;
int32_t iter = 0;
while ((pCtbNum = tSimpleHashIterate(suidHash, pCtbNum, &iter))) {
tb_uid_t *pSuid = tSimpleHashGetKey(pCtbNum, NULL);
int32_t nCols = 0;
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
if (metaGetStbStats(pMeta->pVnode, *pSuid, NULL, &nCols) == 0) {
pStats->numOfTimeSeries -= *(int64_t *)pCtbNum * (nCols - 1);
}
}
}
static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
@ -1009,7 +1051,7 @@ static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
}
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid) {
void *pData = NULL;
int nData = 0;
int rc = 0;
@ -1034,9 +1076,11 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
if (type) *type = e.type;
if (e.type == TSDB_CHILD_TABLE) {
if (pSuid) *pSuid = e.ctbEntry.suid;
void *tData = NULL;
int tLen = 0;
if (tdbTbGet(pMeta->pUidIdx, &e.ctbEntry.suid, sizeof(tb_uid_t), &tData, &tLen) == 0) {
STbDbKey tbDbKey = {.uid = e.ctbEntry.suid, .version = ((SUidIdxVal *)tData)[0].version};
if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &tData, &tLen) == 0) {
@ -1093,23 +1137,18 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
if (e.type != TSDB_SUPER_TABLE) metaDeleteTtl(pMeta, &e);
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
if (e.type == TSDB_CHILD_TABLE) {
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn);
--pStats->numOfCTables;
int32_t nCols = 0;
metaGetStbStats(pMeta->pVnode, e.ctbEntry.suid, 0, &nCols);
pStats->numOfTimeSeries = pStats->numOfCTables * (nCols - 1);
--pMeta->pVnode->config.vndStats.numOfCTables;
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0);
metaUidCacheClear(pMeta, e.ctbEntry.suid);
metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
} else if (e.type == TSDB_NORMAL_TABLE) {
// drop schema.db (todo)
--pStats->numOfNTables;
pStats->numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1;
--pMeta->pVnode->config.vndStats.numOfNTables;
pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1;
} else if (e.type == TSDB_SUPER_TABLE) {
tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn);
// drop schema.db (todo)
@ -1117,14 +1156,11 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
metaStatsCacheDrop(pMeta, uid);
metaUidCacheClear(pMeta, uid);
metaTbGroupCacheClear(pMeta, uid);
metaUpdTimeSeriesNum(pMeta);
--pStats->numOfSTables;
--pMeta->pVnode->config.vndStats.numOfSTables;
}
metaCacheDrop(pMeta, uid);
metaTimeSeriesNotifyCheck(pMeta);
tDecoderClear(&dc);
tdbFree(pData);
@ -1307,7 +1343,6 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
pSchema->nCols--;
--pMeta->pVnode->config.vndStats.numOfNTimeSeries;
metaTimeSeriesNotifyCheck(pMeta);
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
if (pColumn == NULL) {

View File

@ -554,7 +554,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
}
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 1);
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0);
if (pSW) {
*num = pSW->nCols;
tDeleteSchemaWrapper(pSW);