enh: timeseries calculation optimization

This commit is contained in:
kailixu 2023-09-22 20:07:51 +08:00
parent 3beb9c5c84
commit 8f828e052b
26 changed files with 300 additions and 190 deletions

View File

@ -145,6 +145,7 @@ extern bool tsUseAdapter;
extern int32_t tsMetaCacheMaxSize; extern int32_t tsMetaCacheMaxSize;
extern int32_t tsSlowLogThreshold; extern int32_t tsSlowLogThreshold;
extern int32_t tsSlowLogScope; extern int32_t tsSlowLogScope;
extern int32_t tsTimeSeriesThreshold;
// client // client
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;

View File

@ -1464,6 +1464,11 @@ typedef struct {
int32_t learnerProgress; // use one reservered int32_t learnerProgress; // use one reservered
} SVnodeLoad; } SVnodeLoad;
typedef struct {
int32_t vgId;
int64_t nTimeSeries;
} SVnodeLoadLite;
typedef struct { typedef struct {
int8_t syncState; int8_t syncState;
int64_t syncTerm; int64_t syncTerm;
@ -1512,15 +1517,9 @@ int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
void tFreeSStatusReq(SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq);
typedef struct { typedef struct {
int32_t vgId; int32_t dnodeId;
int64_t nTimeSeries; int64_t clusterId;
} SDndNotifyInfo; SArray* pVloads;
int32_t dmProcessNotifyReq(SDndNotifyInfo* pInfo);
typedef struct {
int32_t nVgroup;
SDndNotifyInfo* payload;
} SNotifyReq; } SNotifyReq;
int32_t tSerializeSNotifyReq(void* buf, int32_t bufLen, SNotifyReq* pReq); int32_t tSerializeSNotifyReq(void* buf, int32_t bufLen, SNotifyReq* pReq);

View File

@ -180,9 +180,6 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_NOTIFY, "notify", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_NOTIFY, "notify", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL)
@ -191,6 +188,8 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)

View File

@ -142,6 +142,7 @@ typedef struct SSnapContext {
typedef struct { typedef struct {
int64_t uid; int64_t uid;
int64_t ctbNum; int64_t ctbNum;
int32_t colNum;
} SMetaStbStats; } SMetaStbStats;
// void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); // void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
@ -285,8 +286,8 @@ typedef struct SStoreMeta {
// db name, vgId, numOfTables, numOfSTables // db name, vgId, numOfTables, numOfSTables
int32_t (*getNumOfChildTables)( int32_t (*getNumOfChildTables)(
void* pVnode, int64_t uid, void* pVnode, int64_t uid, int64_t* numOfTables,
int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); int32_t* numOfCols); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); // metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);

View File

@ -191,7 +191,7 @@ typedef struct {
} SMonBmInfo; } SMonBmInfo;
typedef struct { typedef struct {
SArray *pVloads; // SVnodeLoad SArray *pVloads; // SVnodeLoad/SVnodeLoadLite
} SMonVloadInfo; } SMonVloadInfo;
typedef struct { typedef struct {

View File

@ -46,7 +46,7 @@ typedef HANDLE TdThreadMutexAttr; // windows api
typedef struct { typedef struct {
SRWLOCK lock; SRWLOCK lock;
int8_t excl; int8_t excl;
} TdThreadRwlock; // pthread api } TdThreadRwlock; // windows api
typedef pthread_attr_t TdThreadAttr; // pthread api typedef pthread_attr_t TdThreadAttr; // pthread api
typedef pthread_once_t TdThreadOnce; // pthread api typedef pthread_once_t TdThreadOnce; // pthread api
typedef HANDLE TdThreadRwlockAttr; // windows api typedef HANDLE TdThreadRwlockAttr; // windows api

View File

@ -144,6 +144,7 @@ bool tsUseAdapter = false;
int32_t tsMetaCacheMaxSize = -1; // MB int32_t tsMetaCacheMaxSize = -1; // MB
int32_t tsSlowLogThreshold = 3; // seconds int32_t tsSlowLogThreshold = 3; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
int32_t tsTimeSeriesThreshold = 50;
/* /*
* denote if the server needs to compress response message at the application layer to client, including query rsp, * denote if the server needs to compress response message at the application layer to client, including query rsp,
@ -630,6 +631,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX,
CFG_SCOPE_SERVER) != 0) CFG_SCOPE_SERVER) != 0)
@ -1036,6 +1038,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTrimVDbIntervalSec = cfgGetItem(pCfg, "trimVDbIntervalSec")->i32; tsTrimVDbIntervalSec = cfgGetItem(pCfg, "trimVDbIntervalSec")->i32;
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
tsTimeSeriesThreshold = cfgGetItem(pCfg, "timeseriesThreshold")->i32;
tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64; tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64;
@ -1448,6 +1451,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32; tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
} else if (strcasecmp("ttlFlushThreshold", name) == 0) { } else if (strcasecmp("ttlFlushThreshold", name) == 0) {
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32; tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
} else if (strcasecmp("timeseriesThreshold", name) == 0) {
tsTimeSeriesThreshold = cfgGetItem(pCfg, "timeseriesThreshold")->i32;
} }
break; break;
} }

View File

@ -1041,10 +1041,15 @@ int32_t tSerializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->nVgroup) < 0) return -1; if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
for (int32_t i = 0; i < pReq->nVgroup; ++i) { if (tEncodeI64(&encoder, pReq->clusterId) < 0) return -1;
if (tEncodeI32(&encoder, (pReq->payload + i)->vgId) < 0) return -1;
if (tEncodeI64(&encoder, (pReq->payload + i)->nTimeSeries) < 0) return -1; int32_t nVgroup = taosArrayGetSize(pReq->pVloads);
if (tEncodeI32(&encoder, nVgroup) < 0) return -1;
for (int32_t i = 0; i < nVgroup; ++i) {
SVnodeLoadLite *vload = TARRAY_GET_ELEM(pReq->pVloads, i);
if (tEncodeI32(&encoder, vload->vgId) < 0) return -1;
if (tEncodeI64(&encoder, vload->nTimeSeries) < 0) return -1;
} }
tEndEncode(&encoder); tEndEncode(&encoder);
@ -1055,29 +1060,39 @@ int32_t tSerializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
} }
int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) { int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
int32_t code = TSDB_CODE_INVALID_MSG;
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) goto _exit;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) goto _exit;
if (tDecodeI32(&decoder, &pReq->nVgroup) < 0) return -1; if (tDecodeI64(&decoder, &pReq->clusterId) < 0) goto _exit;
int32_t nVgroup = 0;
pReq->payload = taosMemoryMalloc(pReq->nVgroup * (sizeof(SDndNotifyInfo))); if (tDecodeI32(&decoder, &nVgroup) < 0) goto _exit;
if (!pReq->payload) return -1; if (nVgroup > 0) {
pReq->pVloads = taosArrayInit(nVgroup, sizeof(SVnodeLoadLite));
for (int32_t i = 0; i < pReq->nVgroup; ++i) { if (!pReq->pVloads) {
if (tDecodeI32(&decoder, &((pReq->payload + i)->vgId)) < 0) return -1; code = TSDB_CODE_OUT_OF_MEMORY;
if (tDecodeI64(&decoder, &((pReq->payload + i)->nTimeSeries)) < 0) return -1; goto _exit;
}
for (int32_t i = 0; i < nVgroup; ++i) {
SVnodeLoadLite *vload = TARRAY_GET_ELEM(pReq->pVloads, i);
if (tDecodeI32(&decoder, &(vload->vgId)) < 0) goto _exit;
if (tDecodeI64(&decoder, &(vload->nTimeSeries)) < 0) goto _exit;
}
} }
code = 0;
_exit:
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return code;
} }
void tFreeSNotifyReq(SNotifyReq *pReq) { void tFreeSNotifyReq(SNotifyReq *pReq) {
if (pReq) { if (pReq) {
taosMemoryFreeClear(pReq->payload); taosArrayDestroy(pReq->pVloads);
} }
} }

View File

@ -28,6 +28,7 @@ typedef struct SDnodeMgmt {
const char *path; const char *path;
const char *name; const char *name;
TdThread statusThread; TdThread statusThread;
TdThread notifyThread;
TdThread monitorThread; TdThread monitorThread;
TdThread crashReportThread; TdThread crashReportThread;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
@ -36,6 +37,7 @@ typedef struct SDnodeMgmt {
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp; SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetMnodeLoadsFp getMnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq; int32_t statusSeq;
@ -44,6 +46,7 @@ typedef struct SDnodeMgmt {
// dmHandle.c // dmHandle.c
SArray *dmGetMsgHandles(); SArray *dmGetMsgHandles();
void dmSendStatusReq(SDnodeMgmt *pMgmt); void dmSendStatusReq(SDnodeMgmt *pMgmt);
void dmSendNotifyReq(SDnodeMgmt *pMgmt);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
@ -51,11 +54,11 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg); int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg);
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg); int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg);
int32_t dmStartNotify(SDnodeMgmt *pMgmt);
// dmWorker.c // dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt); int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt);
void dmStopStatusThread(SDnodeMgmt *pMgmt); void dmStopStatusThread(SDnodeMgmt *pMgmt);
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt); int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt);
void dmStopMonitorThread(SDnodeMgmt *pMgmt); void dmStopMonitorThread(SDnodeMgmt *pMgmt);

View File

@ -170,6 +170,36 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dmProcessStatusRsp(pMgmt, &rpcRsp); dmProcessStatusRsp(pMgmt, &rpcRsp);
} }
void dmSendNotifyReq(SDnodeMgmt *pMgmt) {
SNotifyReq req = {0};
// taosThreadRwlockRdlock(&pMgmt->pData->lock);
// req.dnodeId = pMgmt->pData->dnodeId;
// taosThreadRwlockUnlock(&pMgmt->pData->lock);
// req.clusterId = pMgmt->pData->clusterId;
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
req.pVloads = vinfo.pVloads;
int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
tSerializeSNotifyReq(pHead, contLen, &req);
tFreeSNotifyReq(&req);
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,
.msgType = TDMT_MND_NOTIFY,
.info.ahandle = (void *)0x9527,
.info.refId = 0,
.info.noResp = 1};
SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt->pData, &epSet);
rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL);
}
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dError("auth rsp is received, but not supported yet"); dError("auth rsp is received, but not supported yet");
return 0; return 0;
@ -408,99 +438,3 @@ _OVER:
return pArray; return pArray;
} }
} }
#ifndef TD_ENTERPRISE
int32_t dmStartNotify(SDnodeMgmt *pMgmt) { return 0; }
int32_t dmProcessNotifyReq(SDndNotifyInfo *pInfo) { return 0; }
#else
static SHashObj *tsDndNotifyInfo = NULL;
static int64_t tsTimeSeries = 0;
static int8_t tsDndNotifyInitLock = 0;
static int8_t tsDndNotifyLock = 0;
static SDnodeMgmt *pSDnodeMgmt = NULL;
int32_t dmStartNotify(SDnodeMgmt *pMgmt) {
pSDnodeMgmt = pMgmt;
return 0;
}
int32_t dmProcessNotifyReq(SDndNotifyInfo *pInfo) {
if (atomic_load_8(&tsDndNotifyInitLock) != 2) {
if (atomic_val_compare_exchange_8(&tsDndNotifyInitLock, 0, 1) == 0) {
tsDndNotifyInfo = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
// fetch the latest tsTimeSeries of vgroups in current dnode
atomic_store_8(&tsDndNotifyInitLock, 2);
} else {
int32_t nLoops = 0;
while (atomic_load_8(&tsDndNotifyInitLock) != 2) {
if (++nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
}
}
if (atomic_val_compare_exchange_8(&tsDndNotifyLock, 0, 1) != 0) {
return 0;
}
int64_t lastTimeSeries = atomic_load_64(&tsTimeSeries);
int64_t *val = NULL;
if ((val = taosHashGet(tsDndNotifyInfo, &pInfo->vgId, sizeof(pInfo->vgId)))) {
if (*val != pInfo->nTimeSeries) {
assert(*val > 0);
atomic_add_fetch_64(&tsTimeSeries, pInfo->nTimeSeries - *val);
taosHashPut(tsDndNotifyInfo, &pInfo->vgId, sizeof(pInfo->vgId), &pInfo->nTimeSeries, sizeof(pInfo->nTimeSeries));
}
} else {
atomic_add_fetch_64(&tsTimeSeries, pInfo->nTimeSeries);
taosHashPut(tsDndNotifyInfo, &pInfo->vgId, sizeof(pInfo->vgId), &pInfo->nTimeSeries, sizeof(pInfo->nTimeSeries));
}
if (atomic_load_64(&tsTimeSeries) - lastTimeSeries > 100) {
int32_t hashSize = taosHashGetSize(tsDndNotifyInfo);
SNotifyReq req = {.nVgroup = hashSize};
req.payload = taosMemoryMalloc(sizeof(SDndNotifyInfo) * hashSize);
int64_t *nVal = NULL;
int32_t iter = 0;
int64_t nnn = 0;
size_t kLen = sizeof(int32_t);
while ((nVal = taosHashIterate(tsDndNotifyInfo, nVal)) && iter < hashSize) {
int32_t *nVgId = taosHashGetKey(nVal, &kLen);
(req.payload + iter)->vgId = *nVgId;
(req.payload + iter)->nTimeSeries = *nVal;
++iter;
}
// assert(nnn < 1000000);
int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req);
if (contLen <= 0) return -1;
void *pHead = rpcMallocCont(contLen);
if (!pHead) {
tFreeSNotifyReq(&req);
atomic_store_8(&tsDndNotifyLock, 0);
return -1;
}
tSerializeSNotifyReq(pHead, contLen, &req);
tFreeSNotifyReq(&req);
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,
.msgType = TDMT_MND_NOTIFY,
.info.ahandle = NULL,
.info.refId = 0,
.info.noResp = 1};
SEpSet epSet = {0};
dmGetMnodeEpSet(pSDnodeMgmt->pData, &epSet);
rpcSendRequest(pSDnodeMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL);
}
atomic_store_8(&tsDndNotifyLock, 0);
return 0;
}
#endif

View File

@ -20,15 +20,17 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
if (dmStartStatusThread(pMgmt) != 0) { if (dmStartStatusThread(pMgmt) != 0) {
return -1; return -1;
} }
#ifdef TD_ENTERPRISE
if (dmStartNotifyThread(pMgmt) != 0) {
return -1;
}
#endif
if (dmStartMonitorThread(pMgmt) != 0) { if (dmStartMonitorThread(pMgmt) != 0) {
return -1; return -1;
} }
if (dmStartCrashReportThread(pMgmt) != 0) { if (dmStartCrashReportThread(pMgmt) != 0) {
return -1; return -1;
} }
if(dmStartNotify(pMgmt) != 0) {
return -1;
}
return 0; return 0;
} }
@ -55,6 +57,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp;
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp; pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;

View File

@ -17,7 +17,6 @@
#include "dmInt.h" #include "dmInt.h"
#include "thttp.h" #include "thttp.h"
int8_t tsNeedUpdStatus = 0;
static void *dmStatusThreadFp(void *param) { static void *dmStatusThreadFp(void *param) {
SDnodeMgmt *pMgmt = param; SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs(); int64_t lastTime = taosGetTimestampMs();
@ -29,13 +28,13 @@ static void *dmStatusThreadFp(void *param) {
int64_t upTime = 0; int64_t upTime = 0;
while (1) { while (1) {
taosMsleep(100); taosMsleep(200);
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
int64_t curTime = taosGetTimestampMs(); int64_t curTime = taosGetTimestampMs();
if (curTime < lastTime) lastTime = curTime; if (curTime < lastTime) lastTime = curTime;
float interval = (curTime - lastTime) / 1000.0f; float interval = (curTime - lastTime) / 1000.0f;
if (atomic_val_compare_exchange_8(&tsNeedUpdStatus, 1, 0) == 1 || interval >= tsStatusInterval) { if (interval >= tsStatusInterval) {
dmSendStatusReq(pMgmt); dmSendStatusReq(pMgmt);
lastTime = curTime; lastTime = curTime;
@ -54,6 +53,26 @@ static void *dmStatusThreadFp(void *param) {
return NULL; return NULL;
} }
tsem_t dmNotifySem;
static void *dmNotifyThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-notify");
if (tsem_init(&dmNotifySem, 0, 0) != 0) {
return NULL;
}
while (1) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
tsem_wait(&dmNotifySem);
dmSendNotifyReq(pMgmt);
}
return NULL;
}
static void *dmMonitorThreadFp(void *param) { static void *dmMonitorThreadFp(void *param) {
SDnodeMgmt *pMgmt = param; SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs(); int64_t lastTime = taosGetTimestampMs();
@ -154,6 +173,29 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt) {
} }
} }
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
dError("failed to create notify thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-notify", "initialized");
return 0;
}
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->notifyThread)) {
tsem_post(&dmNotifySem);
taosThreadJoin(pMgmt->notifyThread, NULL);
taosThreadClear(&pMgmt->notifyThread);
}
tsem_destroy(&dmNotifySem);
}
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) { int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); taosThreadAttrInit(&thAttr);

View File

@ -40,6 +40,28 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
taosThreadRwlockUnlock(&pMgmt->lock); taosThreadRwlockUnlock(&pMgmt->lock);
} }
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
if (!pInfo->pVloads) return;
taosThreadRwlockRdlock(&pMgmt->lock);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
SVnodeLoadLite vload = {0};
if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
taosArrayPush(pInfo->pVloads, &vload);
}
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosThreadRwlockUnlock(&pMgmt->lock);
}
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
SMonVloadInfo vloads = {0}; SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pMgmt, &vloads, true); vmGetVnodeLoads(pMgmt, &vloads, true);

View File

@ -119,6 +119,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
// dmMonitor.c // dmMonitor.c
void dmSendMonitorReport(); void dmSendMonitorReport();
void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoads(SMonVloadInfo *pInfo);
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SMonMloadInfo *pInfo); void dmGetMnodeLoads(SMonMloadInfo *pInfo);
void dmGetQnodeLoads(SQnodeLoad *pInfo); void dmGetQnodeLoads(SQnodeLoad *pInfo);

View File

@ -35,6 +35,7 @@ void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo);
void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo); void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo, bool isReset); void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo, bool isReset);
void vmGetVnodeLoadsLite(void *pMgmt, SMonVloadInfo *pInfo);
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo); void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo); void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo);

View File

@ -419,6 +419,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.processDropNodeFp = dmProcessDropNodeReq, .processDropNodeFp = dmProcessDropNodeReq,
.sendMonitorReportFp = dmSendMonitorReport, .sendMonitorReportFp = dmSendMonitorReport,
.getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsFp = dmGetVnodeLoads,
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
.getMnodeLoadsFp = dmGetMnodeLoads, .getMnodeLoadsFp = dmGetMnodeLoads,
.getQnodeLoadsFp = dmGetQnodeLoads, .getQnodeLoadsFp = dmGetQnodeLoads,
}; };

View File

@ -119,6 +119,17 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
} }
} }
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo) {
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
if (dmMarkWrapper(pWrapper) == 0) {
if (pWrapper->pMgmt != NULL) {
vmGetVnodeLoadsLite(pWrapper->pMgmt, pInfo);
}
dmReleaseWrapper(pWrapper);
}
}
void dmGetMnodeLoads(SMonMloadInfo *pInfo) { void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
SDnode *pDnode = dmInstance(); SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE]; SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];

View File

@ -92,8 +92,6 @@ typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
int32_t dmProcessNotifyReq(SDndNotifyInfo *pInfo);
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
@ -123,6 +121,7 @@ typedef struct {
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp; SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetMnodeLoadsFp getMnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp;
} SMgmtInputOpt; } SMgmtInputOpt;

View File

@ -70,6 +70,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq); static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp); static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessStatusReq(SRpcMsg *pReq); static int32_t mndProcessStatusReq(SRpcMsg *pReq);
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq); static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
@ -80,11 +81,9 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t opLen, int32_t *pOutValue); static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t opLen, int32_t *pOutValue);
#ifndef TD_ENTERPRISE #ifndef TD_ENTERPRISE
static int32_t mndUpdateClusterInfo(SRpcMsg *pReq) { return 0; } static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) { return 0; }
#else #else
int32_t mndUpdateClusterInfo(SRpcMsg *pReq); int32_t mndUpdClusterInfo(SRpcMsg *pReq);
int32_t mndProcessNotifyReq(SRpcMsg *pReq);
#endif #endif
int32_t mndInitDnode(SMnode *pMnode) { int32_t mndInitDnode(SMnode *pMnode) {
@ -540,7 +539,6 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
int64_t nDiffTimeSeries = 0;
bool online = mndIsDnodeOnline(pDnode, curMs); bool online = mndIsDnodeOnline(pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer); bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
@ -557,7 +555,6 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId); SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
if (pVgroup != NULL) { if (pVgroup != NULL) {
nDiffTimeSeries = pVload->numOfTimeSeries - pVgroup->numOfTimeSeries;
if (pVload->syncState == TAOS_SYNC_STATE_LEADER) { if (pVload->syncState == TAOS_SYNC_STATE_LEADER) {
pVgroup->cacheUsage = pVload->cacheUsage; pVgroup->cacheUsage = pVload->cacheUsage;
pVgroup->numOfCachedTables = pVload->numOfCachedTables; pVgroup->numOfCachedTables = pVload->numOfCachedTables;
@ -686,9 +683,41 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
_OVER: _OVER:
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
taosArrayDestroy(statusReq.pVloads); taosArrayDestroy(statusReq.pVloads);
if (nDiffTimeSeries > 0) { mndUpdClusterInfo(pReq);
mndUpdateClusterInfo(pReq); return code;
}
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SNotifyReq notifyReq = {0};
int32_t code = 0;
if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, &notifyReq)) != 0) {
terrno = code;
goto _OVER;
} }
// int64_t clusterid = mndGetClusterId(pMnode);
// if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
// code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
// mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
// notifyReq.clusterId, clusterid, tstrerror(code));
// goto _OVER;
// }
int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
for (int32_t v = 0; v < nVgroup; ++v) {
SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
if (pVgroup != NULL) {
pVgroup->numOfTimeSeries = pVload->nTimeSeries;
mndReleaseVgroup(pMnode, pVgroup);
}
}
_OVER:
mndUpdClusterInfo(pReq);
tFreeSNotifyReq(&notifyReq);
return code; return code;
} }

View File

@ -86,11 +86,13 @@ void *vnodeGetIdx(void *pVnode);
void *vnodeGetIvtIdx(void *pVnode); void *vnodeGetIvtIdx(void *pVnode);
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num); int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num);
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num);
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num); int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num); int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num);
void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad); void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad);
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg);
@ -134,7 +136,7 @@ bool metaTbInFilterCache(void *pVnode, tb_uid_t suid, int8_t type);
int32_t metaPutTbToFilterCache(void *pVnode, tb_uid_t suid, int8_t type); int32_t metaPutTbToFilterCache(void *pVnode, tb_uid_t suid, int8_t type);
int32_t metaSizeOfTbFilterCache(void *pVnode, int8_t type); int32_t metaSizeOfTbFilterCache(void *pVnode, int8_t type);
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables); int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t *numOfCols);
// tsdb // tsdb
typedef struct STsdbReader STsdbReader; typedef struct STsdbReader STsdbReader;
@ -288,10 +290,10 @@ typedef struct {
int64_t numOfSTables; int64_t numOfSTables;
int64_t numOfCTables; int64_t numOfCTables;
int64_t numOfNTables; int64_t numOfNTables;
int64_t numOfCmprTimeSeries; int64_t numOfReportedTimeSeries;
int64_t numOfNTimeSeries; int64_t numOfNTimeSeries;
int64_t numOfTimeSeries; int64_t numOfTimeSeries;
int64_t itvTimeSeries; // int64_t itvTimeSeries;
int64_t pointsWritten; int64_t pointsWritten;
int64_t totalStorage; int64_t totalStorage;
int64_t compStorage; int64_t compStorage;

View File

@ -71,7 +71,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo); int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo);
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid); int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t delta); void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t deltaCtb, int32_t totalCols);
int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, LRUHandle** pHandle); int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, LRUHandle** pHandle);
struct SMeta { struct SMeta {

View File

@ -171,6 +171,7 @@ int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
int64_t metaGetTimeSeriesNum(SMeta* pMeta, int type); int64_t metaGetTimeSeriesNum(SMeta* pMeta, int type);
void metaUpdTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock); SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first); int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first);
void metaPauseCtbCursor(SMCtbCursor* pCtbCur); void metaPauseCtbCursor(SMCtbCursor* pCtbCur);

View File

@ -696,19 +696,31 @@ int64_t metaGetTbNum(SMeta *pMeta) {
return pMeta->pVnode->config.vndStats.numOfCTables + pMeta->pVnode->config.vndStats.numOfNTables; return pMeta->pVnode->config.vndStats.numOfCTables + pMeta->pVnode->config.vndStats.numOfNTables;
} }
// N.B. Called by statusReq per second void metaUpdTimeSeriesNum(SMeta *pMeta) {
int64_t metaGetTimeSeriesNum(SMeta *pMeta, int type) { SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column) int64_t nCtbTimeSeries = 0;
if (type || pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 ||
++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) {
int64_t num = 0;
vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
pMeta->pVnode->config.vndStats.numOfTimeSeries = num;
pMeta->pVnode->config.vndStats.itvTimeSeries = (TD_VID(pMeta->pVnode) % 100) * 2; vnodeGetTimeSeriesNum(pMeta->pVnode, &nCtbTimeSeries);
atomic_store_64(&pStats->numOfTimeSeries, nCtbTimeSeries);
}
static int64_t metaGetTimeSeriesNumImpl(SMeta *pMeta, bool forceUpd) {
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
if (forceUpd || pStats->numOfTimeSeries < 0) {
metaUpdTimeSeriesNum(pMeta);
} }
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries; return pStats->numOfTimeSeries + pStats->numOfNTimeSeries;
}
// type: 1 report timeseries
int64_t metaGetTimeSeriesNum(SMeta *pMeta, int type) {
int64_t nTimeSeries = metaGetTimeSeriesNumImpl(pMeta, false);
if (type == 1) {
atomic_store_64(&pMeta->pVnode->config.vndStats.numOfReportedTimeSeries, nTimeSeries);
}
return nTimeSeries;
} }
typedef struct { typedef struct {
@ -1506,9 +1518,10 @@ _exit:
return code; return code;
} }
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) { int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t *numOfCols) {
int32_t code = 0; int32_t code = 0;
*numOfTables = 0;
if (!numOfTables && !numOfCols) goto _exit;
SVnode *pVnodeObj = pVnode; SVnode *pVnodeObj = pVnode;
metaRLock(pVnodeObj->pMeta); metaRLock(pVnodeObj->pMeta);
@ -1517,7 +1530,8 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) {
SMetaStbStats state = {0}; SMetaStbStats state = {0};
if (metaStatsCacheGet(pVnodeObj->pMeta, uid, &state) == TSDB_CODE_SUCCESS) { if (metaStatsCacheGet(pVnodeObj->pMeta, uid, &state) == TSDB_CODE_SUCCESS) {
metaULock(pVnodeObj->pMeta); metaULock(pVnodeObj->pMeta);
*numOfTables = state.ctbNum; if (numOfTables) *numOfTables = state.ctbNum;
if (numOfCols) *numOfCols = state.colNum;
goto _exit; goto _exit;
} }
@ -1526,10 +1540,15 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) {
vnodeGetCtbNum(pVnode, uid, &ctbNum); vnodeGetCtbNum(pVnode, uid, &ctbNum);
metaULock(pVnodeObj->pMeta); metaULock(pVnodeObj->pMeta);
*numOfTables = ctbNum; if (numOfTables) *numOfTables = ctbNum;
int32_t colNum = 0;
vnodeGetStbColumnNum(pVnode, uid, &colNum);
if (numOfCols) *numOfCols = colNum;
state.uid = uid; state.uid = uid;
state.ctbNum = ctbNum; state.ctbNum = ctbNum;
state.colNum = colNum;
// upsert the cache // upsert the cache
metaWLock(pVnodeObj->pMeta); metaWLock(pVnodeObj->pMeta);
@ -1540,11 +1559,12 @@ _exit:
return code; return code;
} }
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta) { void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t deltaCtb, int32_t totalCols) {
SMetaStbStats stats = {0}; SMetaStbStats stats = {0};
if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) { if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
stats.ctbNum += delta; stats.ctbNum += deltaCtb;
if (totalCols > 0) stats.colNum = totalCols;
metaStatsCacheUpsert(pMeta, &stats); metaStatsCacheUpsert(pMeta, &stats);
} }

View File

@ -15,7 +15,7 @@
#include "meta.h" #include "meta.h"
extern int8_t tsNeedUpdStatus; extern tsem_t dmNotifySem;
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema); static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
@ -194,6 +194,16 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
return 0; return 0;
} }
static inline void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
#ifdef TD_ENTERPRISE
int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0);
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
if (abs(deltaTS) > tsTimeSeriesThreshold) {
tsem_post(&dmNotifySem);
}
#endif
}
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry me = {0}; SMetaEntry me = {0};
int kLen = 0; int kLen = 0;
@ -393,6 +403,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
// metaStatsCacheDrop(pMeta, nStbEntry.uid); // metaStatsCacheDrop(pMeta, nStbEntry.uid);
metaUpdateStbStats(pMeta, pReq->suid, 0, pReq->schemaRow.nCols);
metaULock(pMeta); metaULock(pMeta);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf); if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
@ -773,10 +785,12 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
++pStats->numOfCTables; ++pStats->numOfCTables;
pStats->numOfTimeSeries += 2; // 2 cols for test. int32_t nCols = 0;
metaGetStbStats(pMeta->pVnode, me.ctbEntry.suid, 0, &nCols);
pStats->numOfTimeSeries = pStats->numOfCTables * (nCols - 1);
metaWLock(pMeta); metaWLock(pMeta);
metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1); metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1, 0);
metaUidCacheClear(pMeta, me.ctbEntry.suid); metaUidCacheClear(pMeta, me.ctbEntry.suid);
metaTbGroupCacheClear(pMeta, me.ctbEntry.suid); metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
metaULock(pMeta); metaULock(pMeta);
@ -793,15 +807,8 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
} }
if (metaHandleEntry(pMeta, &me) < 0) goto _err; if (metaHandleEntry(pMeta, &me) < 0) goto _err;
// assert(pStats->numOfTimeSeries + pStats->numOfNTimeSeries < 200000);
// if (pStats->numOfTimeSeries + pStats->numOfNTimeSeries - pStats->numOfCmprTimeSeries > 100) {
// pStats->numOfCmprTimeSeries = pStats->numOfTimeSeries + pStats->numOfNTimeSeries;
// SDndNotifyInfo dNotifyInfo = {.vgId = pMeta->pVnode->config.vgId,
// .nTimeSeries = pStats->numOfTimeSeries + pStats->numOfNTimeSeries};
// dmProcessNotifyReq(&dNotifyInfo);
// }
atomic_val_compare_exchange_8(&tsNeedUpdStatus, 0, 1); metaTimeSeriesNotifyCheck(pMeta);
if (pMetaRsp) { if (pMetaRsp) {
*pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp)); *pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp));
@ -1086,19 +1093,23 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
if (e.type != TSDB_SUPER_TABLE) metaDeleteTtl(pMeta, &e); if (e.type != TSDB_SUPER_TABLE) metaDeleteTtl(pMeta, &e);
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
if (e.type == TSDB_CHILD_TABLE) { if (e.type == TSDB_CHILD_TABLE) {
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn); tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn);
--pMeta->pVnode->config.vndStats.numOfCTables; --pStats->numOfCTables;
int32_t nCols = 0;
metaGetStbStats(pMeta->pVnode, e.ctbEntry.suid, 0, &nCols);
pStats->numOfTimeSeries = pStats->numOfCTables * (nCols - 1);
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1); metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0);
metaUidCacheClear(pMeta, e.ctbEntry.suid); metaUidCacheClear(pMeta, e.ctbEntry.suid);
metaTbGroupCacheClear(pMeta, e.ctbEntry.suid); metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
} else if (e.type == TSDB_NORMAL_TABLE) { } else if (e.type == TSDB_NORMAL_TABLE) {
// drop schema.db (todo) // drop schema.db (todo)
--pMeta->pVnode->config.vndStats.numOfNTables; --pStats->numOfNTables;
pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1; pStats->numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1;
} else if (e.type == TSDB_SUPER_TABLE) { } else if (e.type == TSDB_SUPER_TABLE) {
tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn); tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn);
// drop schema.db (todo) // drop schema.db (todo)
@ -1106,13 +1117,14 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
metaStatsCacheDrop(pMeta, uid); metaStatsCacheDrop(pMeta, uid);
metaUidCacheClear(pMeta, uid); metaUidCacheClear(pMeta, uid);
metaTbGroupCacheClear(pMeta, uid); metaTbGroupCacheClear(pMeta, uid);
--pMeta->pVnode->config.vndStats.numOfSTables; metaUpdTimeSeriesNum(pMeta);
--pStats->numOfSTables;
} }
atomic_val_compare_exchange_8(&tsNeedUpdStatus, 0, 1);
metaCacheDrop(pMeta, uid); metaCacheDrop(pMeta, uid);
metaTimeSeriesNotifyCheck(pMeta);
tDecoderClear(&dc); tDecoderClear(&dc);
tdbFree(pData); tdbFree(pData);
@ -1272,6 +1284,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
strcpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName); strcpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName);
++pMeta->pVnode->config.vndStats.numOfNTimeSeries; ++pMeta->pVnode->config.vndStats.numOfNTimeSeries;
metaTimeSeriesNotifyCheck(pMeta);
break; break;
case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_DROP_COLUMN:
if (pColumn == NULL) { if (pColumn == NULL) {
@ -1294,6 +1307,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
pSchema->nCols--; pSchema->nCols--;
--pMeta->pVnode->config.vndStats.numOfNTimeSeries; --pMeta->pVnode->config.vndStats.numOfNTimeSeries;
metaTimeSeriesNotifyCheck(pMeta);
break; break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
if (pColumn == NULL) { if (pColumn == NULL) {

View File

@ -400,6 +400,15 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
return 0; return 0;
} }
int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad) {
SSyncState syncState = syncGetState(pVnode->sync);
if (syncState.state == TAOS_SYNC_STATE_LEADER) {
pLoad->vgId = TD_VID(pVnode);
pLoad->nTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1);
return 0;
}
return -1;
}
/** /**
* @brief Reset the statistics value by monitor interval * @brief Reset the statistics value by monitor interval
* *
@ -544,7 +553,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) { int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 1); SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 1);
if (pSW) { if (pSW) {
*num = pSW->nCols; *num = pSW->nCols;
@ -634,10 +643,8 @@ int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
int64_t ctbNum = 0; int64_t ctbNum = 0;
metaGetStbStats(pVnode, suid, &ctbNum); int32_t numOfCols = 0;
metaGetStbStats(pVnode, suid, &ctbNum, &numOfCols);
int numOfCols = 0;
vnodeGetStbColumnNum(pVnode, suid, &numOfCols);
*num += ctbNum * (numOfCols - 1); *num += ctbNum * (numOfCols - 1);
} }

View File

@ -3928,7 +3928,7 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO
pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid); pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid);
int64_t numOfChildTables = 0; int64_t numOfChildTables = 0;
pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, uid, &numOfChildTables); pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, uid, &numOfChildTables, NULL);
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes); fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes);
} else { } else {
@ -3979,7 +3979,7 @@ static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, S
pRes->info.id.groupId = groupId; pRes->info.id.groupId = groupId;
int64_t ctbNum = 0; int64_t ctbNum = 0;
int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum); int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum, NULL);
fillTableCountScanDataBlock(pSupp, dbName, varDataVal(stbName), ctbNum, pRes); fillTableCountScanDataBlock(pSupp, dbName, varDataVal(stbName), ctbNum, pRes);
} }