From 6698861c8bd0fc795022619ba97b802e02ef92ee Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 19 Sep 2023 08:29:51 +0800 Subject: [PATCH] chore: timeseries distribute --- include/common/tmsg.h | 16 +++ include/common/tmsgdef.h | 2 + include/dnode/mgmt/dnode.h | 1 + source/common/src/tmsg.c | 46 +++++++++ source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 2 + source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 105 +++++++++++++++++++- source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 2 +- source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 18 +--- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mgmt/node_util/inc/dmUtil.h | 2 + source/dnode/mnode/impl/src/mndDnode.c | 16 +-- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/meta/metaTable.c | 17 +++- source/dnode/vnode/src/vnd/vnodeQuery.c | 2 +- 14 files changed, 201 insertions(+), 30 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9d1a8a9189..e64314c723 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1511,6 +1511,22 @@ int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq); +typedef struct { + int32_t vgId; + int64_t nTimeSeries; +} SDndNotifyInfo; + +int32_t dmProcessNotifyReq(SDndNotifyInfo* pInfo); + +typedef struct { + int32_t nVgroup; + SDndNotifyInfo* payload; +} SNotifyReq; + +int32_t tSerializeSNotifyReq(void* buf, int32_t bufLen, SNotifyReq* pReq); +int32_t tDeserializeSNotifyReq(void* buf, int32_t bufLen, SNotifyReq* pReq); +void tFreeSNotifyReq(SNotifyReq* pReq); + typedef struct { int32_t dnodeId; int64_t clusterId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index dd9a1af67c..39a36a248b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -179,6 +179,8 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_NOTIFY, "notify", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index 82823e3f57..c9f2339d11 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -44,6 +44,7 @@ int32_t dmRun(); */ void dmStop(); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2285d0df23..457f450945 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1035,6 +1035,52 @@ int32_t tDeserializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextR return 0; } +int32_t tSerializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI32(&encoder, pReq->nVgroup) < 0) return -1; + for (int32_t i = 0; i < pReq->nVgroup; ++i) { + if (tEncodeI32(&encoder, (pReq->payload + i)->vgId) < 0) return -1; + if (tEncodeI64(&encoder, (pReq->payload + i)->nTimeSeries) < 0) return -1; + } + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI32(&decoder, &pReq->nVgroup) < 0) return -1; + + pReq->payload = taosMemoryMalloc(pReq->nVgroup * (sizeof(SDndNotifyInfo))); + if (!pReq->payload) return -1; + + for (int32_t i = 0; i < pReq->nVgroup; ++i) { + if (tDecodeI32(&decoder, &((pReq->payload + i)->vgId)) < 0) return -1; + if (tDecodeI64(&decoder, &((pReq->payload + i)->nTimeSeries)) < 0) return -1; + } + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + +void tFreeSNotifyReq(SNotifyReq *pReq) { + if (pReq) { + taosMemoryFreeClear(pReq->payload); + } +} + int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 35947c001b..0cf2c4f7b8 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -50,6 +50,8 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg); +int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg); +int32_t dmStartNotify(SDnodeMgmt *pMgmt); // dmWorker.c int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index a56387079f..bc53827222 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -130,11 +130,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { taosThreadRwlockUnlock(&pMgmt->pData->lock); SMonVloadInfo vinfo = {0}; - (*pMgmt->getVnodeLoadsFp)(&vinfo); + (*pMgmt->getVnodeLoadsFp)(&vinfo); req.pVloads = vinfo.pVloads; SMonMloadInfo minfo = {0}; - (*pMgmt->getMnodeLoadsFp)(&minfo); + (*pMgmt->getMnodeLoadsFp)(&minfo); req.mload = minfo.load; (*pMgmt->getQnodeLoadsFp)(&req.qload); @@ -198,7 +198,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { SServerStatusRsp statusRsp = {0}; SMonMloadInfo minfo = {0}; - (*pMgmt->getMnodeLoadsFp)(&minfo); + (*pMgmt->getMnodeLoadsFp)(&minfo); if (minfo.isMnode && (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) { pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; @@ -207,7 +207,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { } SMonVloadInfo vinfo = {0}; - (*pMgmt->getVnodeLoadsFp)(&vinfo); + (*pMgmt->getVnodeLoadsFp)(&vinfo); for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) { SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i); if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) { @@ -395,6 +395,7 @@ SArray *dmGetMsgHandles() { // Requests handled by MNODE if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; code = 0; @@ -407,3 +408,99 @@ _OVER: return pArray; } } + +#ifndef TD_ENTERPRISE +int32_t dmStartNotify(SDnodeMgmt *pMgmt) { return 0; } +int32_t dmProcessNotifyReq(SDndNotifyInfo *pInfo) { return 0; } +#else +static SHashObj *tsDndNotifyInfo = NULL; +static int64_t tsTimeSeries = 0; +static int8_t tsDndNotifyInitLock = 0; +static int8_t tsDndNotifyLock = 0; +static SDnodeMgmt *pSDnodeMgmt = NULL; + +int32_t dmStartNotify(SDnodeMgmt *pMgmt) { + pSDnodeMgmt = pMgmt; + return 0; +} + +int32_t dmProcessNotifyReq(SDndNotifyInfo *pInfo) { + if (atomic_load_8(&tsDndNotifyInitLock) != 2) { + if (atomic_val_compare_exchange_8(&tsDndNotifyInitLock, 0, 1) == 0) { + tsDndNotifyInfo = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + + // fetch the latest tsTimeSeries of vgroups in current dnode + atomic_store_8(&tsDndNotifyInitLock, 2); + } else { + int32_t nLoops = 0; + while (atomic_load_8(&tsDndNotifyInitLock) != 2) { + if (++nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } + } + } + + if (atomic_val_compare_exchange_8(&tsDndNotifyLock, 0, 1) != 0) { + return 0; + } + + int64_t lastTimeSeries = atomic_load_64(&tsTimeSeries); + + int64_t *val = NULL; + if ((val = taosHashGet(tsDndNotifyInfo, &pInfo->vgId, sizeof(pInfo->vgId)))) { + if (*val != pInfo->nTimeSeries) { + assert(*val > 0); + atomic_add_fetch_64(&tsTimeSeries, pInfo->nTimeSeries - *val); + taosHashPut(tsDndNotifyInfo, &pInfo->vgId, sizeof(pInfo->vgId), &pInfo->nTimeSeries, sizeof(pInfo->nTimeSeries)); + } + } else { + atomic_add_fetch_64(&tsTimeSeries, pInfo->nTimeSeries); + taosHashPut(tsDndNotifyInfo, &pInfo->vgId, sizeof(pInfo->vgId), &pInfo->nTimeSeries, sizeof(pInfo->nTimeSeries)); + } + + if (atomic_load_64(&tsTimeSeries) - lastTimeSeries > 100) { + int32_t hashSize = taosHashGetSize(tsDndNotifyInfo); + SNotifyReq req = {.nVgroup = hashSize}; + req.payload = taosMemoryMalloc(sizeof(SDndNotifyInfo) * hashSize); + + int64_t *nVal = NULL; + int32_t iter = 0; + + int64_t nnn = 0; + size_t kLen = sizeof(int32_t); + while ((nVal = taosHashIterate(tsDndNotifyInfo, nVal)) && iter < hashSize) { + int32_t *nVgId = taosHashGetKey(nVal, &kLen); + (req.payload + iter)->vgId = *nVgId; + (req.payload + iter)->nTimeSeries = *nVal; + ++iter; + } + // assert(nnn < 1000000); + + int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req); + if (contLen <= 0) return -1; + void *pHead = rpcMallocCont(contLen); + if (!pHead) { + tFreeSNotifyReq(&req); + atomic_store_8(&tsDndNotifyLock, 0); + return -1; + } + tSerializeSNotifyReq(pHead, contLen, &req); + tFreeSNotifyReq(&req); + + SRpcMsg rpcMsg = {.pCont = pHead, + .contLen = contLen, + .msgType = TDMT_MND_NOTIFY, + .info.ahandle = NULL, + .info.refId = 0, + .info.noResp = 1}; + + SEpSet epSet = {0}; + dmGetMnodeEpSet(pSDnodeMgmt->pData, &epSet); + rpcSendRequest(pSDnodeMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL); + } + atomic_store_8(&tsDndNotifyLock, 0); + return 0; +} +#endif diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index cd692090c4..2f232a995f 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -26,7 +26,7 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { if (dmStartCrashReportThread(pMgmt) != 0) { return -1; } - if (dmStartNotifyThread(pMgmt) != 0) { + if(dmStartNotify(pMgmt) != 0) { return -1; } return 0; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 08816749bf..9ddc64358a 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -133,21 +133,6 @@ static void *dmCrashReportThreadFp(void *param) { return NULL; } - -int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { - TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) { - dError("failed to create status thread since %s", strerror(errno)); - return -1; - } - - taosThreadAttrDestroy(&thAttr); - tmsgReportStartup("dnode-status", "initialized"); - return 0; -} - int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); @@ -266,6 +251,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { case TDMT_MND_GRANT: code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg); break; + case TDMT_MND_GRANT_NOTIFY: + code = dmProcessGrantNotify(NULL, pMsg); + break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; dGError("msg:%p, not processed in mgmt queue", pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 42eb89b5fb..3fc3ca4cea 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -178,6 +178,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_NOTIFY, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index ad87bc91c6..8d506563ec 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -92,6 +92,8 @@ typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg); +int32_t dmProcessNotifyReq(SDndNotifyInfo *pInfo); + typedef struct { int32_t dnodeId; int64_t clusterId; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index ec6d547795..8f9d37187b 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -79,6 +79,14 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter); static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t opLen, int32_t *pOutValue); +#ifndef TD_ENTERPRISE +static int32_t mndUpdateClusterInfo(SRpcMsg *pReq) { return 0; } +static int32_t mndProcessNotifyReq(SRpcMsg *pReq) { return 0; } +#else +int32_t mndUpdateClusterInfo(SRpcMsg *pReq); +int32_t mndProcessNotifyReq(SRpcMsg *pReq); +#endif + int32_t mndInitDnode(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_DNODE, @@ -96,6 +104,7 @@ int32_t mndInitDnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq); mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp); mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq); + mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq); mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq); mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq); mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq); @@ -480,12 +489,6 @@ static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) { return stateChanged; } -#ifndef TD_ENTERPRISE -static int32_t mndUpdateClusterInfo(SRpcMsg *pReq) { return 0; } -#else -int32_t mndUpdateClusterInfo(SRpcMsg *pReq); -#endif - static int32_t mndProcessStatusReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStatusReq statusReq = {0}; @@ -537,6 +540,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); int64_t curMs = taosGetTimestampMs(); + int64_t nDiffTimeSeries = 0; bool online = mndIsDnodeOnline(pDnode, curMs); bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer); bool reboot = (pDnode->rebootTime != statusReq.rebootTime); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 5ae257aef8..9d422d8068 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -288,6 +288,7 @@ typedef struct { int64_t numOfSTables; int64_t numOfCTables; int64_t numOfNTables; + int64_t numOfCmprTimeSeries; int64_t numOfNTimeSeries; int64_t numOfTimeSeries; int64_t itvTimeSeries; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 8a71327919..4102fa4312 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -36,6 +36,7 @@ static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME); + static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) { pInfo->uid = pEntry->uid; pInfo->version = pEntry->version; @@ -736,6 +737,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs metaReaderClear(&mr); // build SMetaEntry + SVnodeStats *pStats = &pMeta->pVnode->config.vndStats; me.version = ver; me.type = pReq->type; me.uid = pReq->uid; @@ -769,7 +771,9 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs } #endif - ++pMeta->pVnode->config.vndStats.numOfCTables; + ++pStats->numOfCTables; + + pStats->numOfTimeSeries += 2; // 2 cols for test. metaWLock(pMeta); metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1); @@ -784,11 +788,18 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs me.ntbEntry.schemaRow = pReq->ntb.schemaRow; me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1; - ++pMeta->pVnode->config.vndStats.numOfNTables; - pMeta->pVnode->config.vndStats.numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1; + ++pStats->numOfNTables; + pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1; } if (metaHandleEntry(pMeta, &me) < 0) goto _err; + // assert(pStats->numOfTimeSeries + pStats->numOfNTimeSeries < 200000); + // if (pStats->numOfTimeSeries + pStats->numOfNTimeSeries - pStats->numOfCmprTimeSeries > 100) { + // pStats->numOfCmprTimeSeries = pStats->numOfTimeSeries + pStats->numOfNTimeSeries; + // SDndNotifyInfo dNotifyInfo = {.vgId = pMeta->pVnode->config.vgId, + // .nTimeSeries = pStats->numOfTimeSeries + pStats->numOfNTimeSeries}; + // dmProcessNotifyReq(&dNotifyInfo); + // } atomic_val_compare_exchange_8(&tsNeedUpdStatus, 0, 1); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 16a21ca405..be4941d444 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -388,7 +388,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); - pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 0); + pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1); pLoad->totalStorage = (int64_t)3 * 1073741824; pLoad->compStorage = (int64_t)2 * 1073741824; pLoad->pointsWritten = 100;