support disk usage

This commit is contained in:
yihaoDeng 2024-11-05 19:57:13 +08:00
parent ae8ac7e308
commit dc576905f0
5 changed files with 80 additions and 38 deletions

View File

@ -279,6 +279,7 @@ typedef struct SStoreMeta {
int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables);
int32_t (*getDBSize)(void* pVnode, int64_t* dataSize, int64_t* walSize, int64_t* metaSize);
SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock);
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
@ -382,7 +383,8 @@ typedef struct SStateStore {
int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount,
void** ppVal, int32_t* pVLen, int32_t* pWinCode);
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
int32_t* pVLen);
int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType,
int32_t pkLen, SUpdateInfo** ppInfo);

View File

@ -80,6 +80,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeIsCatchUp(SVnode *pVnode);
ESyncRole vnodeGetRole(SVnode *pVnode);
int32_t vnodeGetArbToken(SVnode *pVnode, char *outToken);
int32_t vnodeGetDBSize(void *pVnode, int64_t *dataSize, int64_t *walSize, int64_t *metaSize);
int32_t vnodeUpdateArbTerm(SVnode *pVnode, int64_t arbTerm);
@ -108,7 +109,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
@ -161,25 +162,25 @@ typedef struct STsdbReader STsdbReader;
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8
int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables);
int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num);
int32_t tsdbReaderSetId(void *pReader, const char *idstr);
void tsdbReaderClose2(STsdbReader *pReader);
int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA);
void tsdbReleaseDataBlock2(STsdbReader *pReader);
int32_t tsdbRetrieveDataBlock2(STsdbReader *pReader, SSDataBlock **pBlock, SArray *pIdList);
int32_t tsdbReaderReset2(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo2(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle);
void *tsdbGetIdx2(SMeta *pMeta);
void *tsdbGetIvtIdx2(SMeta *pMeta);
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
void tsdbSetFilesetDelimited(STsdbReader *pReader);
void tsdbReaderSetNotifyCb(STsdbReader *pReader, TsdReaderNotifyCbFn notifyFn, void *param);
int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables);
int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num);
int32_t tsdbReaderSetId(void *pReader, const char *idstr);
void tsdbReaderClose2(STsdbReader *pReader);
int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA);
void tsdbReleaseDataBlock2(STsdbReader *pReader);
int32_t tsdbRetrieveDataBlock2(STsdbReader *pReader, SSDataBlock **pBlock, SArray *pIdList);
int32_t tsdbReaderReset2(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo2(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle);
void *tsdbGetIdx2(SMeta *pMeta);
void *tsdbGetIvtIdx2(SMeta *pMeta);
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
void tsdbSetFilesetDelimited(STsdbReader *pReader);
void tsdbReaderSetNotifyCb(STsdbReader *pReader, TsdReaderNotifyCbFn notifyFn, void *param);
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
@ -224,10 +225,10 @@ void tqReaderClose(STqReader *);
bool tqGetTablePrimaryKey(STqReader *pReader);
void tqSetTablePrimaryKey(STqReader *pReader, int64_t uid);
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
void tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char *id);
void tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
void tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
void tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char *id);
void tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
void tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
bool tqReaderIsQueriedTable(STqReader *pReader, uint64_t uid);
bool tqCurrentBlockConsumed(const STqReader *pReader);
@ -243,7 +244,8 @@ int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, con
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, int64_t *createTime);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet,
int64_t *createTime);
int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished);
// sma
@ -258,14 +260,14 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
bool taosXGetTablePrimaryKey(SSnapContext *ctx);
void taosXSetTablePrimaryKey(SSnapContext *ctx, int64_t uid);
int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
SSnapContext **ctxRet);
int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
int32_t getMetaTableInfoFromSnapshot(SSnapContext *ctx, SMetaTableInfo* info);
int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
void destroySnapContext(SSnapContext *ctx);
bool taosXGetTablePrimaryKey(SSnapContext *ctx);
void taosXSetTablePrimaryKey(SSnapContext *ctx, int64_t uid);
int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
SSnapContext **ctxRet);
int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
int32_t getMetaTableInfoFromSnapshot(SSnapContext *ctx, SMetaTableInfo *info);
int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
void destroySnapContext(SSnapContext *ctx);
// structs
struct STsdbCfg {

View File

@ -105,6 +105,7 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->pauseCtbCursor = metaPauseCtbCursor;
pMeta->closeCtbCursor = metaCloseCtbCursor;
pMeta->ctbCursorNext = metaCtbCursorNext;
pMeta->getDBSize = vnodeGetDBSize;
}
void initTqAPI(SStoreTqReader* pTq) {

View File

@ -870,6 +870,38 @@ int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64
return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid);
}
int32_t vnodeGetDBSize(void *pVnode, int64_t *dataSize, int64_t *walSize, int64_t *metaSize) {
SVnode *pVnodeObj = pVnode;
if (pVnodeObj == NULL) {
return TSDB_CODE_VND_NOT_EXIST;
}
int32_t code = 0;
char path[TSDB_FILENAME_LEN] = {0};
char *dirName[] = {VNODE_TSDB_DIR, VNODE_WAL_DIR, VNODE_META_DIR};
int64_t dirSize[3];
vnodeGetPrimaryDir(pVnodeObj->path, pVnodeObj->diskPrimary, pVnodeObj->pTfs, path, TSDB_FILENAME_LEN);
int32_t offset = strlen(path);
SDiskSize size = {0};
for (int i = 0; i < sizeof(dirName) / sizeof(dirName[0]); i++) {
(void)snprintf(path + offset, TSDB_FILENAME_LEN, "%s%s", TD_DIRSEP, dirName[i]);
code = taosGetDiskSize(path, &size);
if (code != 0) {
return code;
}
path[offset] = 0;
dirSize[i] = size.used;
memset(&size, 0, sizeof(size));
}
*dataSize = dirSize[0];
*walSize = dirSize[1];
*metaSize = dirSize[2];
return 0;
}
int32_t vnodeGetStreamProgress(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
int32_t code = 0;
SStreamProgressReq req;

View File

@ -2044,9 +2044,18 @@ static SSDataBlock* sysTableBuildVgUsage(SOperatorInfo* pOperator) {
int32_t numOfRows = 0;
const char* db = NULL;
int64_t totalSize = 0;
int32_t numOfCols = 0;
int32_t vgId = 0;
int64_t dbSize = 0;
int64_t walSize = 0;
int64_t metaSize = 0;
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
code = pAPI->metaFn.getDBSize(pInfo->readHandle.vnode, &dbSize, &walSize, &metaSize);
QUERY_CHECK_CODE(code, lino, _end);
SName sn = {0};
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
code = tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
@ -2065,10 +2074,6 @@ static SSDataBlock* sysTableBuildVgUsage(SOperatorInfo* pOperator) {
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
int64_t walSize = 1024, totalSize = 0;
int32_t numOfCols = 0;
// SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++);
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
QUERY_CHECK_CODE(code, lino, _end);