enh: timeseries update optimize

This commit is contained in:
kailixu 2023-09-14 20:16:02 +08:00
parent 62c4113f82
commit 449092c5dc
7 changed files with 20 additions and 21 deletions

View File

@ -177,7 +177,6 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_FETCH_TIMER, "grant-fetch-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)

View File

@ -17,6 +17,7 @@
#include "dmInt.h" #include "dmInt.h"
#include "thttp.h" #include "thttp.h"
int8_t tsNeedUpdStatus = 0;
static void *dmStatusThreadFp(void *param) { static void *dmStatusThreadFp(void *param) {
SDnodeMgmt *pMgmt = param; SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs(); int64_t lastTime = taosGetTimestampMs();
@ -28,13 +29,13 @@ static void *dmStatusThreadFp(void *param) {
int64_t upTime = 0; int64_t upTime = 0;
while (1) { while (1) {
taosMsleep(200); taosMsleep(100);
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
int64_t curTime = taosGetTimestampMs(); int64_t curTime = taosGetTimestampMs();
if (curTime < lastTime) lastTime = curTime; if (curTime < lastTime) lastTime = curTime;
float interval = (curTime - lastTime) / 1000.0f; float interval = (curTime - lastTime) / 1000.0f;
if (interval >= tsStatusInterval) { if (atomic_val_compare_exchange_8(&tsNeedUpdStatus, 1, 0) == 1 || interval >= tsStatusInterval) {
dmSendStatusReq(pMgmt); dmSendStatusReq(pMgmt);
lastTime = curTime; lastTime = curTime;

View File

@ -51,6 +51,7 @@ enum {
DND_CONN_ACTIVE_CODE, DND_CONN_ACTIVE_CODE,
}; };
extern int32_t mndUpdateClusterInfo(SRpcMsg *pReq);
static int32_t mndCreateDefaultDnode(SMnode *pMnode); static int32_t mndCreateDefaultDnode(SMnode *pMnode);
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
@ -525,6 +526,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes; bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged; bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged;
int64_t nDiffTimeSeries = 0;
const STraceId *trace = &pReq->info.traceId; const STraceId *trace = &pReq->info.traceId;
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id, mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
@ -535,6 +537,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId); SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
if (pVgroup != NULL) { if (pVgroup != NULL) {
nDiffTimeSeries = pVload->numOfTimeSeries - pVgroup->numOfTimeSeries;
if (pVload->syncState == TAOS_SYNC_STATE_LEADER) { if (pVload->syncState == TAOS_SYNC_STATE_LEADER) {
pVgroup->cacheUsage = pVload->cacheUsage; pVgroup->cacheUsage = pVload->cacheUsage;
pVgroup->numOfCachedTables = pVload->numOfCachedTables; pVgroup->numOfCachedTables = pVload->numOfCachedTables;
@ -662,9 +665,16 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
_OVER: _OVER:
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
taosArrayDestroy(statusReq.pVloads); taosArrayDestroy(statusReq.pVloads);
if (nDiffTimeSeries > 0) {
mndUpdateClusterInfo(pReq);
}
return code; return code;
} }
#ifndef TD_ENTERPRISE
int32_t mndUpdateClusterInfo(SRpcMsg *pReq) { return 0; }
#endif
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) { static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
int32_t code = -1; int32_t code = -1;
SSdbRaw *pRaw = NULL; SSdbRaw *pRaw = NULL;

View File

@ -117,12 +117,10 @@ static int32_t mndRetrieveGrant(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
} }
static int32_t mndProcessGrantHB(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; } static int32_t mndProcessGrantHB(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; }
static int32_t mndProcessGrantFetch(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; }
int32_t mndInitGrant(SMnode *pMnode) { int32_t mndInitGrant(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_GRANTS, mndRetrieveGrant); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_GRANTS, mndRetrieveGrant);
mndSetMsgHandle(pMnode, TDMT_MND_GRANT_HB_TIMER, mndProcessGrantHB); mndSetMsgHandle(pMnode, TDMT_MND_GRANT_HB_TIMER, mndProcessGrantHB);
mndSetMsgHandle(pMnode, TDMT_MND_GRANT_FETCH_TIMER, mndProcessGrantFetch);
return 0; return 0;
} }

View File

@ -175,17 +175,6 @@ static void mndPullupGrant(SMnode *pMnode) {
} }
} }
static void mndFetchGrant(SMnode *pMnode) {
mTrace("fetch grant msg");
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GRANT_FETCH_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9529};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
static void mndIncreaseUpTime(SMnode *pMnode) { static void mndIncreaseUpTime(SMnode *pMnode) {
mTrace("increate uptime"); mTrace("increate uptime");
int32_t contLen = 0; int32_t contLen = 0;
@ -304,10 +293,6 @@ static void *mndThreadFp(void *param) {
mndPullupTelem(pMnode); mndPullupTelem(pMnode);
} }
if (sec % tsGrantFetchInterval == 0) {
mndFetchGrant(pMnode);
}
if (sec % tsGrantHBInterval == 0) { if (sec % tsGrantHBInterval == 0) {
mndPullupGrant(pMnode); mndPullupGrant(pMnode);
} }

View File

@ -700,7 +700,7 @@ int64_t metaGetTbNum(SMeta *pMeta) {
int64_t metaGetTimeSeriesNum(SMeta *pMeta) { int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column) // sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
int64_t nTables = metaGetTbNum(pMeta); int64_t nTables = metaGetTbNum(pMeta);
if (nTables - pMeta->pVnode->config.vndStats.numOfCmprTables > 100 || if (nTables - pMeta->pVnode->config.vndStats.numOfCmprTables > 0 ||
pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 || pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 ||
++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) { ++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) {
int64_t num = 0; int64_t num = 0;

View File

@ -15,6 +15,8 @@
#include "meta.h" #include "meta.h"
extern int8_t tsNeedUpdStatus;
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
@ -788,6 +790,8 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
if (metaHandleEntry(pMeta, &me) < 0) goto _err; if (metaHandleEntry(pMeta, &me) < 0) goto _err;
atomic_val_compare_exchange_8(&tsNeedUpdStatus, 0, 1);
if (pMetaRsp) { if (pMetaRsp) {
*pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp)); *pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp));
@ -1094,6 +1098,8 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
--pMeta->pVnode->config.vndStats.numOfSTables; --pMeta->pVnode->config.vndStats.numOfSTables;
} }
atomic_val_compare_exchange_8(&tsNeedUpdStatus, 0, 1);
metaCacheDrop(pMeta, uid); metaCacheDrop(pMeta, uid);
tDecoderClear(&dc); tDecoderClear(&dc);