fix: first round number of chile tables and timeseries
This commit is contained in:
parent
e95ae14836
commit
cfab6c9b20
|
@ -66,6 +66,10 @@ int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
|
|||
void *vnodeGetIdx(SVnode *pVnode);
|
||||
void *vnodeGetIvtIdx(SVnode *pVnode);
|
||||
|
||||
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num);
|
||||
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
|
||||
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num);
|
||||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
||||
|
||||
|
@ -210,26 +214,37 @@ struct STsdbCfg {
|
|||
SRetention retentions[TSDB_RETENTION_MAX];
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int64_t numOfSTables;
|
||||
int64_t numOfCTables;
|
||||
int64_t numOfNTables;
|
||||
int64_t numOfTimeSeries;
|
||||
int64_t pointsWritten;
|
||||
int64_t totalStorage;
|
||||
int64_t compStorage;
|
||||
} SVnodeStats;
|
||||
|
||||
struct SVnodeCfg {
|
||||
int32_t vgId;
|
||||
char dbname[TSDB_DB_FNAME_LEN];
|
||||
uint64_t dbId;
|
||||
int32_t cacheLastSize;
|
||||
int32_t szPage;
|
||||
int32_t szCache;
|
||||
uint64_t szBuf;
|
||||
bool isHeap;
|
||||
bool isWeak;
|
||||
int8_t cacheLast;
|
||||
int8_t isTsma;
|
||||
int8_t isRsma;
|
||||
int8_t hashMethod;
|
||||
int8_t standby;
|
||||
STsdbCfg tsdbCfg;
|
||||
SWalCfg walCfg;
|
||||
SSyncCfg syncCfg;
|
||||
uint32_t hashBegin;
|
||||
uint32_t hashEnd;
|
||||
int32_t vgId;
|
||||
char dbname[TSDB_DB_FNAME_LEN];
|
||||
uint64_t dbId;
|
||||
int32_t cacheLastSize;
|
||||
int32_t szPage;
|
||||
int32_t szCache;
|
||||
uint64_t szBuf;
|
||||
bool isHeap;
|
||||
bool isWeak;
|
||||
int8_t cacheLast;
|
||||
int8_t isTsma;
|
||||
int8_t isRsma;
|
||||
int8_t hashMethod;
|
||||
int8_t standby;
|
||||
STsdbCfg tsdbCfg;
|
||||
SWalCfg walCfg;
|
||||
SSyncCfg syncCfg;
|
||||
SVnodeStats vndStats;
|
||||
uint32_t hashBegin;
|
||||
uint32_t hashEnd;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -476,14 +476,22 @@ _err:
|
|||
|
||||
// N.B. Called by statusReq per second
|
||||
int64_t metaGetTbNum(SMeta *pMeta) {
|
||||
// TODO
|
||||
return 0;
|
||||
// num of child tables (excluding normal tables , stables and others)
|
||||
|
||||
/* int64_t num = 0; */
|
||||
/* vnodeGetAllCtbNum(pMeta->pVnode, &num); */
|
||||
|
||||
return pMeta->pVnode->config.vndStats.numOfCTables;
|
||||
}
|
||||
|
||||
// N.B. Called by statusReq per second
|
||||
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
|
||||
// TODO
|
||||
return 400;
|
||||
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
|
||||
int64_t num = 0;
|
||||
vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
|
||||
pMeta->pVnode->config.vndStats.numOfTimeSeries = num;
|
||||
|
||||
return pMeta->pVnode->config.vndStats.numOfTimeSeries;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -202,6 +202,8 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
|
||||
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||
|
||||
++pMeta->pVnode->config.vndStats.numOfSTables;
|
||||
|
||||
metaDebug("vgId:%d, super table is created, name:%s uid: %" PRId64, TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
|
||||
|
||||
return 0;
|
||||
|
@ -394,6 +396,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
|||
me.ctbEntry.comment = pReq->comment;
|
||||
me.ctbEntry.suid = pReq->ctb.suid;
|
||||
me.ctbEntry.pTags = pReq->ctb.pTag;
|
||||
|
||||
++pMeta->pVnode->config.vndStats.numOfCTables;
|
||||
} else {
|
||||
me.ntbEntry.ctime = pReq->ctime;
|
||||
me.ntbEntry.ttlDays = pReq->ttl;
|
||||
|
@ -401,6 +405,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
|||
me.ntbEntry.comment = pReq->comment;
|
||||
me.ntbEntry.schemaRow = pReq->ntb.schemaRow;
|
||||
me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1;
|
||||
|
||||
++pMeta->pVnode->config.vndStats.numOfNTables;
|
||||
}
|
||||
|
||||
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||
|
@ -534,11 +540,17 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
|||
|
||||
if (e.type == TSDB_CHILD_TABLE) {
|
||||
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn);
|
||||
|
||||
--pMeta->pVnode->config.vndStats.numOfCTables;
|
||||
} else if (e.type == TSDB_NORMAL_TABLE) {
|
||||
// drop schema.db (todo)
|
||||
|
||||
--pMeta->pVnode->config.vndStats.numOfNTables;
|
||||
} else if (e.type == TSDB_SUPER_TABLE) {
|
||||
tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), &pMeta->txn);
|
||||
// drop schema.db (todo)
|
||||
|
||||
--pMeta->pVnode->config.vndStats.numOfSTables;
|
||||
}
|
||||
|
||||
tDecoderClear(&dc);
|
||||
|
|
|
@ -112,6 +112,12 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
|
||||
if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1;
|
||||
|
||||
if (tjsonAddIntegerToObject(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "vndStats.ntables", pCfg->vndStats.numOfNTables) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries) < 0) return -1;
|
||||
|
||||
SJson *pNodeInfoArr = tjsonCreateArray();
|
||||
tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", pNodeInfoArr);
|
||||
for (int i = 0; i < pCfg->syncCfg.replicaNum; ++i) {
|
||||
|
@ -210,6 +216,15 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code);
|
||||
if (code < 0) return -1;
|
||||
|
||||
tjsonGetNumberValue(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "vndStats.ntables", pCfg->vndStats.numOfNTables, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries, code);
|
||||
if (code < 0) return -1;
|
||||
|
||||
SJson *pNodeInfoArr = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo");
|
||||
int arraySize = tjsonGetArraySize(pNodeInfoArr);
|
||||
assert(arraySize == pCfg->syncCfg.replicaNum);
|
||||
|
|
|
@ -30,7 +30,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
SRpcMsg rpcMsg = {0};
|
||||
int32_t code = 0;
|
||||
int32_t rspLen = 0;
|
||||
void * pRsp = NULL;
|
||||
void *pRsp = NULL;
|
||||
SSchemaWrapper schema = {0};
|
||||
SSchemaWrapper schemaTag = {0};
|
||||
|
||||
|
@ -104,7 +104,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
} else {
|
||||
pRsp = taosMemoryCalloc(1, rspLen);
|
||||
}
|
||||
|
||||
|
||||
if (pRsp == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
|
@ -127,7 +127,7 @@ _exit:
|
|||
} else {
|
||||
*pMsg = rpcMsg;
|
||||
}
|
||||
|
||||
|
||||
taosMemoryFree(metaRsp.pSchemas);
|
||||
metaReaderClear(&mer2);
|
||||
metaReaderClear(&mer1);
|
||||
|
@ -143,7 +143,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
SRpcMsg rpcMsg = {0};
|
||||
int32_t code = 0;
|
||||
int32_t rspLen = 0;
|
||||
void * pRsp = NULL;
|
||||
void *pRsp = NULL;
|
||||
SSchemaWrapper schema = {0};
|
||||
SSchemaWrapper schemaTag = {0};
|
||||
|
||||
|
@ -246,7 +246,7 @@ _exit:
|
|||
} else {
|
||||
*pMsg = rpcMsg;
|
||||
}
|
||||
|
||||
|
||||
tFreeSTableCfgRsp(&cfgRsp);
|
||||
metaReaderClear(&mer2);
|
||||
metaReaderClear(&mer1);
|
||||
|
@ -254,38 +254,38 @@ _exit:
|
|||
}
|
||||
|
||||
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
int32_t offset = 0;
|
||||
int32_t rspSize = 0;
|
||||
SBatchReq *batchReq = (SBatchReq*)pMsg->pCont;
|
||||
int32_t msgNum = ntohl(batchReq->msgNum);
|
||||
int32_t code = 0;
|
||||
int32_t offset = 0;
|
||||
int32_t rspSize = 0;
|
||||
SBatchReq *batchReq = (SBatchReq *)pMsg->pCont;
|
||||
int32_t msgNum = ntohl(batchReq->msgNum);
|
||||
offset += sizeof(SBatchReq);
|
||||
SBatchMsg req = {0};
|
||||
SBatchRsp rsp = {0};
|
||||
SRpcMsg reqMsg = *pMsg;
|
||||
SRpcMsg rspMsg = {0};
|
||||
void* pRsp = NULL;
|
||||
SRpcMsg reqMsg = *pMsg;
|
||||
SRpcMsg rspMsg = {0};
|
||||
void *pRsp = NULL;
|
||||
|
||||
SArray* batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
|
||||
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
|
||||
if (NULL == batchRsp) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
||||
for (int32_t i = 0; i < msgNum; ++i) {
|
||||
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
||||
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
||||
offset += sizeof(req.msgType);
|
||||
|
||||
req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
||||
req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
||||
offset += sizeof(req.msgLen);
|
||||
|
||||
req.msg = (char*)pMsg->pCont + offset;
|
||||
req.msg = (char *)pMsg->pCont + offset;
|
||||
offset += req.msgLen;
|
||||
|
||||
reqMsg.msgType = req.msgType;
|
||||
reqMsg.pCont = req.msg;
|
||||
reqMsg.contLen = req.msgLen;
|
||||
|
||||
|
||||
switch (req.msgType) {
|
||||
case TDMT_VND_TABLE_META:
|
||||
vnodeGetTableMeta(pVnode, &reqMsg, false);
|
||||
|
@ -305,7 +305,7 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
rsp.msgLen = reqMsg.contLen;
|
||||
rsp.rspCode = reqMsg.code;
|
||||
rsp.msg = reqMsg.pCont;
|
||||
|
||||
|
||||
taosArrayPush(batchRsp, &rsp);
|
||||
|
||||
rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES;
|
||||
|
@ -313,25 +313,25 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
rspSize += sizeof(int32_t);
|
||||
offset = 0;
|
||||
|
||||
|
||||
pRsp = rpcMallocCont(rspSize);
|
||||
if (pRsp == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(msgNum);
|
||||
*(int32_t *)((char *)pRsp + offset) = htonl(msgNum);
|
||||
offset += sizeof(msgNum);
|
||||
for (int32_t i = 0; i < msgNum; ++i) {
|
||||
SBatchRsp *p = taosArrayGet(batchRsp, i);
|
||||
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->reqType);
|
||||
|
||||
*(int32_t *)((char *)pRsp + offset) = htonl(p->reqType);
|
||||
offset += sizeof(p->reqType);
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen);
|
||||
*(int32_t *)((char *)pRsp + offset) = htonl(p->msgLen);
|
||||
offset += sizeof(p->msgLen);
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode);
|
||||
*(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode);
|
||||
offset += sizeof(p->rspCode);
|
||||
memcpy((char*)pRsp + offset, p->msg, p->msgLen);
|
||||
memcpy((char *)pRsp + offset, p->msg, p->msgLen);
|
||||
offset += p->msgLen;
|
||||
|
||||
taosMemoryFreeClear(p->msg);
|
||||
|
@ -418,6 +418,85 @@ int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
|
||||
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid);
|
||||
if (!pCur) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
*num = 0;
|
||||
while (1) {
|
||||
tb_uid_t id = metaCtbCursorNext(pCur);
|
||||
if (id == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
++(*num);
|
||||
}
|
||||
|
||||
metaCloseCtbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
|
||||
STSchema *pTSchema = metaGetTbTSchema(pVnode->pMeta, suid, -1);
|
||||
// metaGetTbTSchemaEx(pVnode->pMeta, suid, suid, -1, &pTSchema);
|
||||
|
||||
*num = pTSchema->numOfCols;
|
||||
|
||||
taosMemoryFree(pTSchema);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num) {
|
||||
SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, 0);
|
||||
if (!pCur) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
*num = 0;
|
||||
while (1) {
|
||||
tb_uid_t id = metaStbCursorNext(pCur);
|
||||
if (id == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
int64_t ctbNum = 0;
|
||||
vnodeGetCtbNum(pVnode, id, &ctbNum);
|
||||
int numOfCols = 0;
|
||||
vnodeGetStbColumnNum(pVnode, id, &numOfCols);
|
||||
|
||||
*num += ctbNum * numOfCols;
|
||||
}
|
||||
|
||||
metaCloseStbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num) {
|
||||
SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, 0);
|
||||
if (!pCur) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
*num = 0;
|
||||
while (1) {
|
||||
tb_uid_t id = metaStbCursorNext(pCur);
|
||||
if (id == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
int64_t ctbNum = 0;
|
||||
vnodeGetCtbNum(pVnode, id, &ctbNum);
|
||||
|
||||
*num += ctbNum;
|
||||
}
|
||||
|
||||
metaCloseStbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void *vnodeGetIdx(SVnode *pVnode) {
|
||||
if (pVnode == NULL) {
|
||||
return NULL;
|
||||
|
|
Loading…
Reference in New Issue