Merge pull request #22879 from taosdata/enh/TS-3812-3.0
enh: timeseries calculation
This commit is contained in:
commit
7a711bc687
|
@ -145,6 +145,7 @@ extern bool tsUseAdapter;
|
||||||
extern int32_t tsMetaCacheMaxSize;
|
extern int32_t tsMetaCacheMaxSize;
|
||||||
extern int32_t tsSlowLogThreshold;
|
extern int32_t tsSlowLogThreshold;
|
||||||
extern int32_t tsSlowLogScope;
|
extern int32_t tsSlowLogScope;
|
||||||
|
extern int32_t tsTimeSeriesThreshold;
|
||||||
|
|
||||||
// client
|
// client
|
||||||
extern int32_t tsMinSlidingTime;
|
extern int32_t tsMinSlidingTime;
|
||||||
|
|
|
@ -30,6 +30,8 @@ extern "C" {
|
||||||
#define GRANTS_COL_MAX_LEN 196
|
#define GRANTS_COL_MAX_LEN 196
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define GRANT_HEART_BEAT_MIN 2
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_GRANT_ALL,
|
TSDB_GRANT_ALL,
|
||||||
TSDB_GRANT_TIME,
|
TSDB_GRANT_TIME,
|
||||||
|
|
|
@ -1464,6 +1464,11 @@ typedef struct {
|
||||||
int32_t learnerProgress; // use one reservered
|
int32_t learnerProgress; // use one reservered
|
||||||
} SVnodeLoad;
|
} SVnodeLoad;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t vgId;
|
||||||
|
int64_t nTimeSeries;
|
||||||
|
} SVnodeLoadLite;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t syncState;
|
int8_t syncState;
|
||||||
int64_t syncTerm;
|
int64_t syncTerm;
|
||||||
|
@ -1511,6 +1516,16 @@ int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
||||||
int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
||||||
void tFreeSStatusReq(SStatusReq* pReq);
|
void tFreeSStatusReq(SStatusReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t dnodeId;
|
||||||
|
int64_t clusterId;
|
||||||
|
SArray* pVloads;
|
||||||
|
} SNotifyReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSNotifyReq(void* buf, int32_t bufLen, SNotifyReq* pReq);
|
||||||
|
int32_t tDeserializeSNotifyReq(void* buf, int32_t bufLen, SNotifyReq* pReq);
|
||||||
|
void tFreeSNotifyReq(SNotifyReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
int64_t clusterId;
|
int64_t clusterId;
|
||||||
|
|
|
@ -179,8 +179,7 @@ enum { // WARN: new msg should be appended to segment tail
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_NOTIFY, "notify", NULL, NULL)
|
||||||
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL)
|
||||||
|
@ -189,6 +188,8 @@ enum { // WARN: new msg should be appended to segment tail
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
||||||
|
|
|
@ -142,6 +142,7 @@ typedef struct SSnapContext {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int64_t ctbNum;
|
int64_t ctbNum;
|
||||||
|
int32_t colNum;
|
||||||
} SMetaStbStats;
|
} SMetaStbStats;
|
||||||
|
|
||||||
// void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
|
// void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
|
||||||
|
@ -285,8 +286,8 @@ typedef struct SStoreMeta {
|
||||||
|
|
||||||
// db name, vgId, numOfTables, numOfSTables
|
// db name, vgId, numOfTables, numOfSTables
|
||||||
int32_t (*getNumOfChildTables)(
|
int32_t (*getNumOfChildTables)(
|
||||||
void* pVnode, int64_t uid,
|
void* pVnode, int64_t uid, int64_t* numOfTables,
|
||||||
int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
|
int32_t* numOfCols); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
|
||||||
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
|
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
|
||||||
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
|
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
|
||||||
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
|
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
|
||||||
|
|
|
@ -191,7 +191,7 @@ typedef struct {
|
||||||
} SMonBmInfo;
|
} SMonBmInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SArray *pVloads; // SVnodeLoad
|
SArray *pVloads; // SVnodeLoad/SVnodeLoadLite
|
||||||
} SMonVloadInfo;
|
} SMonVloadInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -46,7 +46,7 @@ typedef HANDLE TdThreadMutexAttr; // windows api
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SRWLOCK lock;
|
SRWLOCK lock;
|
||||||
int8_t excl;
|
int8_t excl;
|
||||||
} TdThreadRwlock; // pthread api
|
} TdThreadRwlock; // windows api
|
||||||
typedef pthread_attr_t TdThreadAttr; // pthread api
|
typedef pthread_attr_t TdThreadAttr; // pthread api
|
||||||
typedef pthread_once_t TdThreadOnce; // pthread api
|
typedef pthread_once_t TdThreadOnce; // pthread api
|
||||||
typedef HANDLE TdThreadRwlockAttr; // windows api
|
typedef HANDLE TdThreadRwlockAttr; // windows api
|
||||||
|
|
|
@ -144,6 +144,7 @@ bool tsUseAdapter = false;
|
||||||
int32_t tsMetaCacheMaxSize = -1; // MB
|
int32_t tsMetaCacheMaxSize = -1; // MB
|
||||||
int32_t tsSlowLogThreshold = 3; // seconds
|
int32_t tsSlowLogThreshold = 3; // seconds
|
||||||
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
|
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
|
||||||
|
int32_t tsTimeSeriesThreshold = 50;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
||||||
|
@ -635,6 +636,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX,
|
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX,
|
||||||
CFG_SCOPE_SERVER) != 0)
|
CFG_SCOPE_SERVER) != 0)
|
||||||
|
@ -1043,6 +1045,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsTrimVDbIntervalSec = cfgGetItem(pCfg, "trimVDbIntervalSec")->i32;
|
tsTrimVDbIntervalSec = cfgGetItem(pCfg, "trimVDbIntervalSec")->i32;
|
||||||
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
|
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
|
||||||
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
|
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
|
||||||
|
tsTimeSeriesThreshold = cfgGetItem(pCfg, "timeseriesThreshold")->i32;
|
||||||
|
|
||||||
tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64;
|
tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64;
|
||||||
|
|
||||||
|
@ -1458,6 +1461,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
|
||||||
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
|
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
|
||||||
} else if (strcasecmp("ttlFlushThreshold", name) == 0) {
|
} else if (strcasecmp("ttlFlushThreshold", name) == 0) {
|
||||||
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
|
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
|
||||||
|
} else if (strcasecmp("timeseriesThreshold", name) == 0) {
|
||||||
|
tsTimeSeriesThreshold = cfgGetItem(pCfg, "timeseriesThreshold")->i32;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1035,6 +1035,68 @@ int32_t tDeserializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->clusterId) < 0) return -1;
|
||||||
|
|
||||||
|
int32_t nVgroup = taosArrayGetSize(pReq->pVloads);
|
||||||
|
if (tEncodeI32(&encoder, nVgroup) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < nVgroup; ++i) {
|
||||||
|
SVnodeLoadLite *vload = TARRAY_GET_ELEM(pReq->pVloads, i);
|
||||||
|
if (tEncodeI32(&encoder, vload->vgId) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, vload->nTimeSeries) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
|
||||||
|
int32_t code = TSDB_CODE_INVALID_MSG;
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) goto _exit;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) goto _exit;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->clusterId) < 0) goto _exit;
|
||||||
|
int32_t nVgroup = 0;
|
||||||
|
if (tDecodeI32(&decoder, &nVgroup) < 0) goto _exit;
|
||||||
|
if (nVgroup > 0) {
|
||||||
|
pReq->pVloads = taosArrayInit(nVgroup, sizeof(SVnodeLoadLite));
|
||||||
|
if (!pReq->pVloads) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < nVgroup; ++i) {
|
||||||
|
SVnodeLoadLite vload;
|
||||||
|
if (tDecodeI32(&decoder, &(vload.vgId)) < 0) goto _exit;
|
||||||
|
if (tDecodeI64(&decoder, &(vload.nTimeSeries)) < 0) goto _exit;
|
||||||
|
taosArrayPush(pReq->pVloads, &vload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tFreeSNotifyReq(SNotifyReq *pReq) {
|
||||||
|
if (pReq) {
|
||||||
|
taosArrayDestroy(pReq->pVloads);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
|
@ -28,6 +28,7 @@ typedef struct SDnodeMgmt {
|
||||||
const char *path;
|
const char *path;
|
||||||
const char *name;
|
const char *name;
|
||||||
TdThread statusThread;
|
TdThread statusThread;
|
||||||
|
TdThread notifyThread;
|
||||||
TdThread monitorThread;
|
TdThread monitorThread;
|
||||||
TdThread crashReportThread;
|
TdThread crashReportThread;
|
||||||
SSingleWorker mgmtWorker;
|
SSingleWorker mgmtWorker;
|
||||||
|
@ -36,6 +37,7 @@ typedef struct SDnodeMgmt {
|
||||||
ProcessDropNodeFp processDropNodeFp;
|
ProcessDropNodeFp processDropNodeFp;
|
||||||
SendMonitorReportFp sendMonitorReportFp;
|
SendMonitorReportFp sendMonitorReportFp;
|
||||||
GetVnodeLoadsFp getVnodeLoadsFp;
|
GetVnodeLoadsFp getVnodeLoadsFp;
|
||||||
|
GetVnodeLoadsFp getVnodeLoadsLiteFp;
|
||||||
GetMnodeLoadsFp getMnodeLoadsFp;
|
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||||
GetQnodeLoadsFp getQnodeLoadsFp;
|
GetQnodeLoadsFp getQnodeLoadsFp;
|
||||||
int32_t statusSeq;
|
int32_t statusSeq;
|
||||||
|
@ -44,17 +46,21 @@ typedef struct SDnodeMgmt {
|
||||||
// dmHandle.c
|
// dmHandle.c
|
||||||
SArray *dmGetMsgHandles();
|
SArray *dmGetMsgHandles();
|
||||||
void dmSendStatusReq(SDnodeMgmt *pMgmt);
|
void dmSendStatusReq(SDnodeMgmt *pMgmt);
|
||||||
|
void dmSendNotifyReq(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg);
|
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg);
|
||||||
|
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// dmWorker.c
|
// dmWorker.c
|
||||||
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
|
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
|
||||||
void dmStopStatusThread(SDnodeMgmt *pMgmt);
|
void dmStopStatusThread(SDnodeMgmt *pMgmt);
|
||||||
|
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt);
|
||||||
|
void dmStopNotifyThread(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt);
|
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt);
|
||||||
void dmStopMonitorThread(SDnodeMgmt *pMgmt);
|
void dmStopMonitorThread(SDnodeMgmt *pMgmt);
|
||||||
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt);
|
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt);
|
||||||
|
|
|
@ -170,6 +170,36 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
dmProcessStatusRsp(pMgmt, &rpcRsp);
|
dmProcessStatusRsp(pMgmt, &rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dmSendNotifyReq(SDnodeMgmt *pMgmt) {
|
||||||
|
SNotifyReq req = {0};
|
||||||
|
|
||||||
|
taosThreadRwlockRdlock(&pMgmt->pData->lock);
|
||||||
|
req.dnodeId = pMgmt->pData->dnodeId;
|
||||||
|
taosThreadRwlockUnlock(&pMgmt->pData->lock);
|
||||||
|
|
||||||
|
req.clusterId = pMgmt->pData->clusterId;
|
||||||
|
|
||||||
|
SMonVloadInfo vinfo = {0};
|
||||||
|
(*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
|
||||||
|
req.pVloads = vinfo.pVloads;
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req);
|
||||||
|
void *pHead = rpcMallocCont(contLen);
|
||||||
|
tSerializeSNotifyReq(pHead, contLen, &req);
|
||||||
|
tFreeSNotifyReq(&req);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {.pCont = pHead,
|
||||||
|
.contLen = contLen,
|
||||||
|
.msgType = TDMT_MND_NOTIFY,
|
||||||
|
.info.ahandle = (void *)0x9527,
|
||||||
|
.info.refId = 0,
|
||||||
|
.info.noResp = 1};
|
||||||
|
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
||||||
|
rpcSendRequest(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
dError("auth rsp is received, but not supported yet");
|
dError("auth rsp is received, but not supported yet");
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -395,6 +425,7 @@ SArray *dmGetMsgHandles() {
|
||||||
|
|
||||||
// Requests handled by MNODE
|
// Requests handled by MNODE
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
|
@ -20,6 +20,11 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
|
||||||
if (dmStartStatusThread(pMgmt) != 0) {
|
if (dmStartStatusThread(pMgmt) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#ifdef TD_ENTERPRISE
|
||||||
|
if (dmStartNotifyThread(pMgmt) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
if (dmStartMonitorThread(pMgmt) != 0) {
|
if (dmStartMonitorThread(pMgmt) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -33,6 +38,7 @@ static void dmStopMgmt(SDnodeMgmt *pMgmt) {
|
||||||
pMgmt->pData->stopped = true;
|
pMgmt->pData->stopped = true;
|
||||||
dmStopMonitorThread(pMgmt);
|
dmStopMonitorThread(pMgmt);
|
||||||
dmStopStatusThread(pMgmt);
|
dmStopStatusThread(pMgmt);
|
||||||
|
dmStopNotifyThread(pMgmt);
|
||||||
dmStopCrashReportThread(pMgmt);
|
dmStopCrashReportThread(pMgmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,6 +58,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
|
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
|
||||||
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
|
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
|
||||||
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
|
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
|
||||||
|
pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp;
|
||||||
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
|
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
|
||||||
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;
|
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,26 @@ static void *dmStatusThreadFp(void *param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsem_t dmNotifySem;
|
||||||
|
static void *dmNotifyThreadFp(void *param) {
|
||||||
|
SDnodeMgmt *pMgmt = param;
|
||||||
|
int64_t lastTime = taosGetTimestampMs();
|
||||||
|
setThreadName("dnode-notify");
|
||||||
|
|
||||||
|
if (tsem_init(&dmNotifySem, 0, 0) != 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
|
||||||
|
|
||||||
|
tsem_wait(&dmNotifySem);
|
||||||
|
dmSendNotifyReq(pMgmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static void *dmMonitorThreadFp(void *param) {
|
static void *dmMonitorThreadFp(void *param) {
|
||||||
SDnodeMgmt *pMgmt = param;
|
SDnodeMgmt *pMgmt = param;
|
||||||
int64_t lastTime = taosGetTimestampMs();
|
int64_t lastTime = taosGetTimestampMs();
|
||||||
|
@ -132,7 +152,6 @@ static void *dmCrashReportThreadFp(void *param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
|
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
|
||||||
TdThreadAttr thAttr;
|
TdThreadAttr thAttr;
|
||||||
taosThreadAttrInit(&thAttr);
|
taosThreadAttrInit(&thAttr);
|
||||||
|
@ -154,6 +173,29 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
|
||||||
|
TdThreadAttr thAttr;
|
||||||
|
taosThreadAttrInit(&thAttr);
|
||||||
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
|
||||||
|
dError("failed to create notify thread since %s", strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
tmsgReportStartup("dnode-notify", "initialized");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
|
||||||
|
if (taosCheckPthreadValid(pMgmt->notifyThread)) {
|
||||||
|
tsem_post(&dmNotifySem);
|
||||||
|
taosThreadJoin(pMgmt->notifyThread, NULL);
|
||||||
|
taosThreadClear(&pMgmt->notifyThread);
|
||||||
|
}
|
||||||
|
tsem_destroy(&dmNotifySem);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
|
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
|
||||||
TdThreadAttr thAttr;
|
TdThreadAttr thAttr;
|
||||||
taosThreadAttrInit(&thAttr);
|
taosThreadAttrInit(&thAttr);
|
||||||
|
@ -251,6 +293,11 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
case TDMT_MND_GRANT:
|
case TDMT_MND_GRANT:
|
||||||
code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
|
code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
|
||||||
break;
|
break;
|
||||||
|
case TDMT_MND_GRANT_NOTIFY:
|
||||||
|
#ifdef MAKE_JENKINS_HAPPY
|
||||||
|
code = dmProcessGrantNotify(NULL, pMsg);
|
||||||
|
#endif
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
dGError("msg:%p, not processed in mgmt queue", pMsg);
|
dGError("msg:%p, not processed in mgmt queue", pMsg);
|
||||||
|
|
|
@ -178,6 +178,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_NOTIFY, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -40,6 +40,28 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
|
||||||
taosThreadRwlockUnlock(&pMgmt->lock);
|
taosThreadRwlockUnlock(&pMgmt->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||||
|
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
|
||||||
|
if (!pInfo->pVloads) return;
|
||||||
|
|
||||||
|
taosThreadRwlockRdlock(&pMgmt->lock);
|
||||||
|
|
||||||
|
void *pIter = taosHashIterate(pMgmt->hash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
SVnodeObj **ppVnode = pIter;
|
||||||
|
if (ppVnode == NULL || *ppVnode == NULL) continue;
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
|
SVnodeLoadLite vload = {0};
|
||||||
|
if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
|
||||||
|
taosArrayPush(pInfo->pVloads, &vload);
|
||||||
|
}
|
||||||
|
pIter = taosHashIterate(pMgmt->hash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadRwlockUnlock(&pMgmt->lock);
|
||||||
|
}
|
||||||
|
|
||||||
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||||
SMonVloadInfo vloads = {0};
|
SMonVloadInfo vloads = {0};
|
||||||
vmGetVnodeLoads(pMgmt, &vloads, true);
|
vmGetVnodeLoads(pMgmt, &vloads, true);
|
||||||
|
|
|
@ -119,6 +119,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
// dmMonitor.c
|
// dmMonitor.c
|
||||||
void dmSendMonitorReport();
|
void dmSendMonitorReport();
|
||||||
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
|
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
|
||||||
|
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo);
|
||||||
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
|
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
|
||||||
void dmGetQnodeLoads(SQnodeLoad *pInfo);
|
void dmGetQnodeLoads(SQnodeLoad *pInfo);
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo);
|
||||||
void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
|
void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
|
||||||
|
|
||||||
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo, bool isReset);
|
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo, bool isReset);
|
||||||
|
void vmGetVnodeLoadsLite(void *pMgmt, SMonVloadInfo *pInfo);
|
||||||
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
|
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
|
||||||
void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo);
|
void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo);
|
||||||
|
|
||||||
|
|
|
@ -419,6 +419,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
||||||
.processDropNodeFp = dmProcessDropNodeReq,
|
.processDropNodeFp = dmProcessDropNodeReq,
|
||||||
.sendMonitorReportFp = dmSendMonitorReport,
|
.sendMonitorReportFp = dmSendMonitorReport,
|
||||||
.getVnodeLoadsFp = dmGetVnodeLoads,
|
.getVnodeLoadsFp = dmGetVnodeLoads,
|
||||||
|
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
|
||||||
.getMnodeLoadsFp = dmGetMnodeLoads,
|
.getMnodeLoadsFp = dmGetMnodeLoads,
|
||||||
.getQnodeLoadsFp = dmGetQnodeLoads,
|
.getQnodeLoadsFp = dmGetQnodeLoads,
|
||||||
};
|
};
|
||||||
|
|
|
@ -119,6 +119,17 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo) {
|
||||||
|
SDnode *pDnode = dmInstance();
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
|
||||||
|
if (dmMarkWrapper(pWrapper) == 0) {
|
||||||
|
if (pWrapper->pMgmt != NULL) {
|
||||||
|
vmGetVnodeLoadsLite(pWrapper->pMgmt, pInfo);
|
||||||
|
}
|
||||||
|
dmReleaseWrapper(pWrapper);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
|
void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
|
||||||
SDnode *pDnode = dmInstance();
|
SDnode *pDnode = dmInstance();
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
|
||||||
|
|
|
@ -121,6 +121,7 @@ typedef struct {
|
||||||
ProcessDropNodeFp processDropNodeFp;
|
ProcessDropNodeFp processDropNodeFp;
|
||||||
SendMonitorReportFp sendMonitorReportFp;
|
SendMonitorReportFp sendMonitorReportFp;
|
||||||
GetVnodeLoadsFp getVnodeLoadsFp;
|
GetVnodeLoadsFp getVnodeLoadsFp;
|
||||||
|
GetVnodeLoadsFp getVnodeLoadsLiteFp;
|
||||||
GetMnodeLoadsFp getMnodeLoadsFp;
|
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||||
GetQnodeLoadsFp getQnodeLoadsFp;
|
GetQnodeLoadsFp getQnodeLoadsFp;
|
||||||
} SMgmtInputOpt;
|
} SMgmtInputOpt;
|
||||||
|
|
|
@ -71,6 +71,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
|
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
|
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
|
||||||
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
|
static int32_t mndProcessStatusReq(SRpcMsg *pReq);
|
||||||
|
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
|
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
|
||||||
|
|
||||||
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
|
@ -80,6 +81,12 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t opLen, int32_t *pOutValue);
|
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t opLen, int32_t *pOutValue);
|
||||||
|
|
||||||
|
#ifndef TD_ENTERPRISE
|
||||||
|
static int32_t mndUpdClusterInfo(SRpcMsg *pReq) { return 0; }
|
||||||
|
#else
|
||||||
|
int32_t mndUpdClusterInfo(SRpcMsg *pReq);
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t mndInitDnode(SMnode *pMnode) {
|
int32_t mndInitDnode(SMnode *pMnode) {
|
||||||
SSdbTable table = {
|
SSdbTable table = {
|
||||||
.sdbType = SDB_DNODE,
|
.sdbType = SDB_DNODE,
|
||||||
|
@ -97,6 +104,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
|
||||||
|
@ -543,6 +551,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
|
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
|
||||||
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
|
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
|
||||||
|
|
||||||
|
if (reboot) {
|
||||||
|
tsGrantHBInterval = GRANT_HEART_BEAT_MIN;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
|
for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
|
||||||
SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
|
SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
|
||||||
|
|
||||||
|
@ -676,6 +688,45 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
_OVER:
|
_OVER:
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
taosArrayDestroy(statusReq.pVloads);
|
taosArrayDestroy(statusReq.pVloads);
|
||||||
|
#ifdef MAKE_JENKINS_HAPPY
|
||||||
|
mndUpdClusterInfo(pReq);
|
||||||
|
#endif
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) {
|
||||||
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
SNotifyReq notifyReq = {0};
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if ((code = tDeserializeSNotifyReq(pReq->pCont, pReq->contLen, ¬ifyReq)) != 0) {
|
||||||
|
terrno = code;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t clusterid = mndGetClusterId(pMnode);
|
||||||
|
if (notifyReq.clusterId != 0 && notifyReq.clusterId != clusterid) {
|
||||||
|
code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
|
||||||
|
mWarn("dnode:%d, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 " since %s", notifyReq.dnodeId,
|
||||||
|
notifyReq.clusterId, clusterid, tstrerror(code));
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nVgroup = taosArrayGetSize(notifyReq.pVloads);
|
||||||
|
for (int32_t v = 0; v < nVgroup; ++v) {
|
||||||
|
SVnodeLoadLite *pVload = taosArrayGet(notifyReq.pVloads, v);
|
||||||
|
|
||||||
|
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
|
||||||
|
if (pVgroup != NULL) {
|
||||||
|
pVgroup->numOfTimeSeries = pVload->nTimeSeries;
|
||||||
|
mndReleaseVgroup(pMnode, pVgroup);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_OVER:
|
||||||
|
#ifdef MAKE_JENKINS_HAPPY
|
||||||
|
mndUpdClusterInfo(pReq);
|
||||||
|
#endif
|
||||||
|
tFreeSNotifyReq(¬ifyReq);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1678,6 +1678,10 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
pNew->numOfColumns = pNew->numOfColumns + ncols;
|
pNew->numOfColumns = pNew->numOfColumns + ncols;
|
||||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -86,11 +86,13 @@ void *vnodeGetIdx(void *pVnode);
|
||||||
void *vnodeGetIvtIdx(void *pVnode);
|
void *vnodeGetIvtIdx(void *pVnode);
|
||||||
|
|
||||||
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num);
|
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num);
|
||||||
|
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num);
|
||||||
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
|
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
|
||||||
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num);
|
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num);
|
||||||
|
|
||||||
void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
|
int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad);
|
||||||
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
||||||
|
|
||||||
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
@ -134,7 +136,7 @@ bool metaTbInFilterCache(void *pVnode, tb_uid_t suid, int8_t type);
|
||||||
int32_t metaPutTbToFilterCache(void *pVnode, tb_uid_t suid, int8_t type);
|
int32_t metaPutTbToFilterCache(void *pVnode, tb_uid_t suid, int8_t type);
|
||||||
int32_t metaSizeOfTbFilterCache(void *pVnode, int8_t type);
|
int32_t metaSizeOfTbFilterCache(void *pVnode, int8_t type);
|
||||||
|
|
||||||
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables);
|
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t *numOfCols);
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
typedef struct STsdbReader STsdbReader;
|
typedef struct STsdbReader STsdbReader;
|
||||||
|
@ -288,10 +290,10 @@ typedef struct {
|
||||||
int64_t numOfSTables;
|
int64_t numOfSTables;
|
||||||
int64_t numOfCTables;
|
int64_t numOfCTables;
|
||||||
int64_t numOfNTables;
|
int64_t numOfNTables;
|
||||||
int64_t numOfCmprTables;
|
int64_t numOfReportedTimeSeries;
|
||||||
int64_t numOfNTimeSeries;
|
int64_t numOfNTimeSeries;
|
||||||
int64_t numOfTimeSeries;
|
int64_t numOfTimeSeries;
|
||||||
int64_t itvTimeSeries;
|
// int64_t itvTimeSeries;
|
||||||
int64_t pointsWritten;
|
int64_t pointsWritten;
|
||||||
int64_t totalStorage;
|
int64_t totalStorage;
|
||||||
int64_t compStorage;
|
int64_t compStorage;
|
||||||
|
|
|
@ -71,7 +71,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
|
||||||
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo);
|
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo);
|
||||||
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid);
|
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid);
|
||||||
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
|
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
|
||||||
void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t delta);
|
void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol);
|
||||||
int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, LRUHandle** pHandle);
|
int32_t metaUidFilterCacheGet(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, LRUHandle** pHandle);
|
||||||
|
|
||||||
struct SMeta {
|
struct SMeta {
|
||||||
|
|
|
@ -170,7 +170,8 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
|
||||||
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||||
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
||||||
|
|
||||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
int64_t metaGetTimeSeriesNum(SMeta* pMeta, int type);
|
||||||
|
void metaUpdTimeSeriesNum(SMeta* pMeta);
|
||||||
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
|
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
|
||||||
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first);
|
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first);
|
||||||
void metaPauseCtbCursor(SMCtbCursor* pCtbCur);
|
void metaPauseCtbCursor(SMCtbCursor* pCtbCur);
|
||||||
|
|
|
@ -696,22 +696,30 @@ int64_t metaGetTbNum(SMeta *pMeta) {
|
||||||
return pMeta->pVnode->config.vndStats.numOfCTables + pMeta->pVnode->config.vndStats.numOfNTables;
|
return pMeta->pVnode->config.vndStats.numOfCTables + pMeta->pVnode->config.vndStats.numOfNTables;
|
||||||
}
|
}
|
||||||
|
|
||||||
// N.B. Called by statusReq per second
|
void metaUpdTimeSeriesNum(SMeta *pMeta) {
|
||||||
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
|
int64_t nCtbTimeSeries = 0;
|
||||||
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
|
if (vnodeGetTimeSeriesNum(pMeta->pVnode, &nCtbTimeSeries) == 0) {
|
||||||
int64_t nTables = metaGetTbNum(pMeta);
|
atomic_store_64(&pMeta->pVnode->config.vndStats.numOfTimeSeries, nCtbTimeSeries);
|
||||||
if (nTables - pMeta->pVnode->config.vndStats.numOfCmprTables > 100 ||
|
}
|
||||||
pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 ||
|
}
|
||||||
++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) {
|
|
||||||
int64_t num = 0;
|
|
||||||
vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
|
|
||||||
pMeta->pVnode->config.vndStats.numOfTimeSeries = num;
|
|
||||||
|
|
||||||
pMeta->pVnode->config.vndStats.itvTimeSeries = (TD_VID(pMeta->pVnode) % 100) * 2;
|
static FORCE_INLINE int64_t metaGetTimeSeriesNumImpl(SMeta *pMeta, bool forceUpd) {
|
||||||
pMeta->pVnode->config.vndStats.numOfCmprTables = nTables;
|
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
|
||||||
|
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
|
||||||
|
if (forceUpd || pStats->numOfTimeSeries <= 0) {
|
||||||
|
metaUpdTimeSeriesNum(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
return pStats->numOfTimeSeries + pStats->numOfNTimeSeries;
|
||||||
|
}
|
||||||
|
|
||||||
|
// type: 1 reported timeseries
|
||||||
|
int64_t metaGetTimeSeriesNum(SMeta *pMeta, int type) {
|
||||||
|
int64_t nTimeSeries = metaGetTimeSeriesNumImpl(pMeta, false);
|
||||||
|
if (type == 1) {
|
||||||
|
atomic_store_64(&pMeta->pVnode->config.vndStats.numOfReportedTimeSeries, nTimeSeries);
|
||||||
|
}
|
||||||
|
return nTimeSeries;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -1509,9 +1517,10 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) {
|
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t *numOfCols) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
*numOfTables = 0;
|
|
||||||
|
if (!numOfTables && !numOfCols) goto _exit;
|
||||||
|
|
||||||
SVnode *pVnodeObj = pVnode;
|
SVnode *pVnodeObj = pVnode;
|
||||||
metaRLock(pVnodeObj->pMeta);
|
metaRLock(pVnodeObj->pMeta);
|
||||||
|
@ -1520,19 +1529,26 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) {
|
||||||
SMetaStbStats state = {0};
|
SMetaStbStats state = {0};
|
||||||
if (metaStatsCacheGet(pVnodeObj->pMeta, uid, &state) == TSDB_CODE_SUCCESS) {
|
if (metaStatsCacheGet(pVnodeObj->pMeta, uid, &state) == TSDB_CODE_SUCCESS) {
|
||||||
metaULock(pVnodeObj->pMeta);
|
metaULock(pVnodeObj->pMeta);
|
||||||
*numOfTables = state.ctbNum;
|
if (numOfTables) *numOfTables = state.ctbNum;
|
||||||
|
if (numOfCols) *numOfCols = state.colNum;
|
||||||
|
ASSERTS(state.colNum > 0, "vgId:%d, suid:%" PRIi64 " nCols:%d <= 0 in metaCache", TD_VID(pVnodeObj), uid,
|
||||||
|
state.colNum);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
// slow path: search TDB
|
// slow path: search TDB
|
||||||
int64_t ctbNum = 0;
|
int64_t ctbNum = 0;
|
||||||
|
int32_t colNum = 0;
|
||||||
vnodeGetCtbNum(pVnode, uid, &ctbNum);
|
vnodeGetCtbNum(pVnode, uid, &ctbNum);
|
||||||
|
vnodeGetStbColumnNum(pVnode, uid, &colNum);
|
||||||
metaULock(pVnodeObj->pMeta);
|
metaULock(pVnodeObj->pMeta);
|
||||||
*numOfTables = ctbNum;
|
|
||||||
|
if (numOfTables) *numOfTables = ctbNum;
|
||||||
|
if (numOfCols) *numOfCols = colNum;
|
||||||
|
|
||||||
state.uid = uid;
|
state.uid = uid;
|
||||||
state.ctbNum = ctbNum;
|
state.ctbNum = ctbNum;
|
||||||
|
state.colNum = colNum;
|
||||||
|
|
||||||
// upsert the cache
|
// upsert the cache
|
||||||
metaWLock(pVnodeObj->pMeta);
|
metaWLock(pVnodeObj->pMeta);
|
||||||
|
@ -1543,12 +1559,12 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta) {
|
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol) {
|
||||||
SMetaStbStats stats = {0};
|
SMetaStbStats stats = {0};
|
||||||
|
|
||||||
if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
|
if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
|
||||||
stats.ctbNum += delta;
|
stats.ctbNum += deltaCtb;
|
||||||
|
stats.colNum += deltaCol;
|
||||||
metaStatsCacheUpsert(pMeta, &stats);
|
metaStatsCacheUpsert(pMeta, &stats);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
|
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
|
|
||||||
|
extern tsem_t dmNotifySem;
|
||||||
|
|
||||||
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
||||||
static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema);
|
||||||
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
|
@ -26,7 +28,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
|
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
|
||||||
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
|
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid);
|
||||||
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
|
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
|
||||||
// opt ins_tables query
|
// opt ins_tables query
|
||||||
static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
|
@ -34,6 +36,7 @@ static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
|
|
||||||
|
|
||||||
static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
|
static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
|
||||||
pInfo->uid = pEntry->uid;
|
pInfo->uid = pEntry->uid;
|
||||||
pInfo->version = pEntry->version;
|
pInfo->version = pEntry->version;
|
||||||
|
@ -191,6 +194,14 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
|
||||||
|
#ifdef TD_ENTERPRISE
|
||||||
|
int64_t nTimeSeries = metaGetTimeSeriesNum(pMeta, 0);
|
||||||
|
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
|
||||||
|
if (deltaTS > tsTimeSeriesThreshold) tsem_post(&dmNotifySem);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
int kLen = 0;
|
int kLen = 0;
|
||||||
|
@ -292,7 +303,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
|
||||||
|
|
||||||
for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) {
|
for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) {
|
||||||
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild);
|
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild);
|
||||||
metaDropTableByUid(pMeta, uid, NULL);
|
metaDropTableByUid(pMeta, uid, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop super table
|
// drop super table
|
||||||
|
@ -304,8 +315,12 @@ _drop_super_table:
|
||||||
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn);
|
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn);
|
||||||
tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn);
|
tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn);
|
||||||
|
|
||||||
|
metaStatsCacheDrop(pMeta, pReq->suid);
|
||||||
|
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
metaUpdTimeSeriesNum(pMeta);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pData);
|
tdbFree(pData);
|
||||||
|
@ -376,6 +391,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
|
nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
|
||||||
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
|
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
|
||||||
|
|
||||||
|
int32_t deltaCol = pReq->schemaRow.nCols - oStbEntry.stbEntry.schemaRow.nCols;
|
||||||
|
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
// compare two entry
|
// compare two entry
|
||||||
if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) {
|
if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) {
|
||||||
|
@ -390,8 +407,18 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
|
|
||||||
// metaStatsCacheDrop(pMeta, nStbEntry.uid);
|
// metaStatsCacheDrop(pMeta, nStbEntry.uid);
|
||||||
|
|
||||||
|
if (deltaCol != 0) {
|
||||||
|
metaUpdateStbStats(pMeta, pReq->suid, 0, deltaCol);
|
||||||
|
}
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
if (deltaCol != 0) {
|
||||||
|
int64_t ctbNum;
|
||||||
|
metaGetStbStats(pMeta->pVnode, pReq->suid, &ctbNum, NULL);
|
||||||
|
pMeta->pVnode->config.vndStats.numOfTimeSeries += (ctbNum * deltaCol);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
tdbTbcClose(pTbDbc);
|
tdbTbcClose(pTbDbc);
|
||||||
|
@ -734,6 +761,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
// build SMetaEntry
|
// build SMetaEntry
|
||||||
|
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
|
||||||
me.version = ver;
|
me.version = ver;
|
||||||
me.type = pReq->type;
|
me.type = pReq->type;
|
||||||
me.uid = pReq->uid;
|
me.uid = pReq->uid;
|
||||||
|
@ -767,10 +795,13 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
++pMeta->pVnode->config.vndStats.numOfCTables;
|
++pStats->numOfCTables;
|
||||||
|
int32_t nCols = 0;
|
||||||
|
metaGetStbStats(pMeta->pVnode, me.ctbEntry.suid, 0, &nCols);
|
||||||
|
pStats->numOfTimeSeries += nCols - 1;
|
||||||
|
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1);
|
metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1, 0);
|
||||||
metaUidCacheClear(pMeta, me.ctbEntry.suid);
|
metaUidCacheClear(pMeta, me.ctbEntry.suid);
|
||||||
metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
|
metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
@ -782,12 +813,14 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
|
||||||
me.ntbEntry.schemaRow = pReq->ntb.schemaRow;
|
me.ntbEntry.schemaRow = pReq->ntb.schemaRow;
|
||||||
me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1;
|
me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1;
|
||||||
|
|
||||||
++pMeta->pVnode->config.vndStats.numOfNTables;
|
++pStats->numOfNTables;
|
||||||
pMeta->pVnode->config.vndStats.numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1;
|
pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||||
|
|
||||||
|
metaTimeSeriesNotifyCheck(pMeta);
|
||||||
|
|
||||||
if (pMetaRsp) {
|
if (pMetaRsp) {
|
||||||
*pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp));
|
*pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp));
|
||||||
|
|
||||||
|
@ -817,7 +850,8 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
|
||||||
void *pData = NULL;
|
void *pData = NULL;
|
||||||
int nData = 0;
|
int nData = 0;
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
tb_uid_t uid;
|
tb_uid_t uid = 0;
|
||||||
|
tb_uid_t suid = 0;
|
||||||
int type;
|
int type;
|
||||||
|
|
||||||
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
|
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
|
||||||
|
@ -828,9 +862,19 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
|
||||||
uid = *(tb_uid_t *)pData;
|
uid = *(tb_uid_t *)pData;
|
||||||
|
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
metaDropTableByUid(pMeta, uid, &type);
|
rc = metaDropTableByUid(pMeta, uid, &type, &suid);
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
if (rc < 0) goto _exit;
|
||||||
|
|
||||||
|
if (type == TSDB_CHILD_TABLE) {
|
||||||
|
int32_t nCols = 0;
|
||||||
|
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
|
||||||
|
if (metaGetStbStats(pMeta->pVnode, suid, NULL, &nCols) == 0) {
|
||||||
|
pStats->numOfTimeSeries -= nCols - 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) {
|
if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) {
|
||||||
taosArrayPush(tbUids, &uid);
|
taosArrayPush(tbUids, &uid);
|
||||||
}
|
}
|
||||||
|
@ -839,20 +883,48 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
|
||||||
*tbUid = uid;
|
*tbUid = uid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
tdbFree(pData);
|
tdbFree(pData);
|
||||||
return 0;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
void metaDropTables(SMeta *pMeta, SArray *tbUids) {
|
void metaDropTables(SMeta *pMeta, SArray *tbUids) {
|
||||||
if (taosArrayGetSize(tbUids) == 0) return;
|
if (taosArrayGetSize(tbUids) == 0) return;
|
||||||
|
|
||||||
|
int64_t nCtbDropped = 0;
|
||||||
|
SSHashObj *suidHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||||
|
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
|
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
|
||||||
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
|
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUids, i);
|
||||||
metaDropTableByUid(pMeta, uid, NULL);
|
tb_uid_t suid = 0;
|
||||||
|
int type;
|
||||||
|
metaDropTableByUid(pMeta, uid, &type, &suid);
|
||||||
|
if (type == TSDB_CHILD_TABLE && suid != 0 && suidHash) {
|
||||||
|
int64_t *pVal = tSimpleHashGet(suidHash, &suid, sizeof(tb_uid_t));
|
||||||
|
if (pVal) {
|
||||||
|
nCtbDropped = *pVal + 1;
|
||||||
|
} else {
|
||||||
|
nCtbDropped = 1;
|
||||||
|
}
|
||||||
|
tSimpleHashPut(suidHash, &suid, sizeof(tb_uid_t), &nCtbDropped, sizeof(int64_t));
|
||||||
|
}
|
||||||
metaDebug("batch drop table:%" PRId64, uid);
|
metaDebug("batch drop table:%" PRId64, uid);
|
||||||
}
|
}
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
// update timeseries
|
||||||
|
void *pCtbDropped = NULL;
|
||||||
|
int32_t iter = 0;
|
||||||
|
while ((pCtbDropped = tSimpleHashIterate(suidHash, pCtbDropped, &iter))) {
|
||||||
|
tb_uid_t *pSuid = tSimpleHashGetKey(pCtbDropped, NULL);
|
||||||
|
int32_t nCols = 0;
|
||||||
|
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
|
||||||
|
if (metaGetStbStats(pMeta->pVnode, *pSuid, NULL, &nCols) == 0) {
|
||||||
|
pStats->numOfTimeSeries -= *(int64_t *)pCtbDropped * (nCols - 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tSimpleHashCleanup(suidHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
|
static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
|
||||||
|
@ -987,7 +1059,7 @@ static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
|
return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid) {
|
||||||
void *pData = NULL;
|
void *pData = NULL;
|
||||||
int nData = 0;
|
int nData = 0;
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
|
@ -1012,9 +1084,11 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||||
if (type) *type = e.type;
|
if (type) *type = e.type;
|
||||||
|
|
||||||
if (e.type == TSDB_CHILD_TABLE) {
|
if (e.type == TSDB_CHILD_TABLE) {
|
||||||
|
if (pSuid) *pSuid = e.ctbEntry.suid;
|
||||||
void *tData = NULL;
|
void *tData = NULL;
|
||||||
int tLen = 0;
|
int tLen = 0;
|
||||||
|
|
||||||
|
|
||||||
if (tdbTbGet(pMeta->pUidIdx, &e.ctbEntry.suid, sizeof(tb_uid_t), &tData, &tLen) == 0) {
|
if (tdbTbGet(pMeta->pUidIdx, &e.ctbEntry.suid, sizeof(tb_uid_t), &tData, &tLen) == 0) {
|
||||||
STbDbKey tbDbKey = {.uid = e.ctbEntry.suid, .version = ((SUidIdxVal *)tData)[0].version};
|
STbDbKey tbDbKey = {.uid = e.ctbEntry.suid, .version = ((SUidIdxVal *)tData)[0].version};
|
||||||
if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &tData, &tLen) == 0) {
|
if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &tData, &tLen) == 0) {
|
||||||
|
@ -1075,8 +1149,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||||
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn);
|
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn);
|
||||||
|
|
||||||
--pMeta->pVnode->config.vndStats.numOfCTables;
|
--pMeta->pVnode->config.vndStats.numOfCTables;
|
||||||
|
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0);
|
||||||
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1);
|
|
||||||
metaUidCacheClear(pMeta, e.ctbEntry.suid);
|
metaUidCacheClear(pMeta, e.ctbEntry.suid);
|
||||||
metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
|
metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
|
||||||
} else if (e.type == TSDB_NORMAL_TABLE) {
|
} else if (e.type == TSDB_NORMAL_TABLE) {
|
||||||
|
@ -1243,6 +1316,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
|
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
pSchema->version++;
|
pSchema->version++;
|
||||||
pSchema->nCols++;
|
pSchema->nCols++;
|
||||||
pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols);
|
pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols);
|
||||||
|
@ -1255,6 +1331,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
strcpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName);
|
strcpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName);
|
||||||
|
|
||||||
++pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
++pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
||||||
|
metaTimeSeriesNotifyCheck(pMeta);
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||||
if (pColumn == NULL) {
|
if (pColumn == NULL) {
|
||||||
|
|
|
@ -388,7 +388,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||||
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
||||||
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
|
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
|
||||||
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
||||||
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1);
|
||||||
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
||||||
pLoad->compStorage = (int64_t)2 * 1073741824;
|
pLoad->compStorage = (int64_t)2 * 1073741824;
|
||||||
pLoad->pointsWritten = 100;
|
pLoad->pointsWritten = 100;
|
||||||
|
@ -400,6 +400,15 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad) {
|
||||||
|
SSyncState syncState = syncGetState(pVnode->sync);
|
||||||
|
if (syncState.state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
pLoad->vgId = TD_VID(pVnode);
|
||||||
|
pLoad->nTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* @brief Reset the statistics value by monitor interval
|
* @brief Reset the statistics value by monitor interval
|
||||||
*
|
*
|
||||||
|
@ -544,8 +553,8 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
|
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
|
||||||
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 1);
|
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0);
|
||||||
if (pSW) {
|
if (pSW) {
|
||||||
*num = pSW->nCols;
|
*num = pSW->nCols;
|
||||||
tDeleteSchemaWrapper(pSW);
|
tDeleteSchemaWrapper(pSW);
|
||||||
|
@ -634,10 +643,8 @@ int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num) {
|
||||||
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
|
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
|
||||||
|
|
||||||
int64_t ctbNum = 0;
|
int64_t ctbNum = 0;
|
||||||
metaGetStbStats(pVnode, suid, &ctbNum);
|
int32_t numOfCols = 0;
|
||||||
|
metaGetStbStats(pVnode, suid, &ctbNum, &numOfCols);
|
||||||
int numOfCols = 0;
|
|
||||||
vnodeGetStbColumnNum(pVnode, suid, &numOfCols);
|
|
||||||
|
|
||||||
*num += ctbNum * (numOfCols - 1);
|
*num += ctbNum * (numOfCols - 1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3928,7 +3928,7 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO
|
||||||
pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid);
|
pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid);
|
||||||
|
|
||||||
int64_t numOfChildTables = 0;
|
int64_t numOfChildTables = 0;
|
||||||
pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, uid, &numOfChildTables);
|
pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, uid, &numOfChildTables, NULL);
|
||||||
|
|
||||||
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes);
|
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes);
|
||||||
} else {
|
} else {
|
||||||
|
@ -3979,7 +3979,7 @@ static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, S
|
||||||
pRes->info.id.groupId = groupId;
|
pRes->info.id.groupId = groupId;
|
||||||
|
|
||||||
int64_t ctbNum = 0;
|
int64_t ctbNum = 0;
|
||||||
int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum);
|
int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum, NULL);
|
||||||
fillTableCountScanDataBlock(pSupp, dbName, varDataVal(stbName), ctbNum, pRes);
|
fillTableCountScanDataBlock(pSupp, dbName, varDataVal(stbName), ctbNum, pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue