From dc576905f03572cbd171ab43f375671f8fcf351f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 5 Nov 2024 19:57:13 +0800 Subject: [PATCH] support disk usage --- include/libs/executor/storageapi.h | 4 +- source/dnode/vnode/inc/vnode.h | 68 +++++++++++----------- source/dnode/vnode/src/vnd/vnodeInitApi.c | 1 + source/dnode/vnode/src/vnd/vnodeQuery.c | 32 ++++++++++ source/libs/executor/src/sysscanoperator.c | 13 +++-- 5 files changed, 80 insertions(+), 38 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 8e88a1a278..02ef4f01bd 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -279,6 +279,7 @@ typedef struct SStoreMeta { int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables); + int32_t (*getDBSize)(void* pVnode, int64_t* dataSize, int64_t* walSize, int64_t* metaSize); SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock); int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); @@ -382,7 +383,8 @@ typedef struct SStateStore { int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, int32_t* pVLen, int32_t* pWinCode); - int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen); + int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, + int32_t* pVLen); int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 610ba43673..5c1bf829a9 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -80,6 +80,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeIsCatchUp(SVnode *pVnode); ESyncRole vnodeGetRole(SVnode *pVnode); int32_t vnodeGetArbToken(SVnode *pVnode, char *outToken); +int32_t vnodeGetDBSize(void *pVnode, int64_t *dataSize, int64_t *walSize, int64_t *metaSize); int32_t vnodeUpdateArbTerm(SVnode *pVnode, int64_t arbTerm); @@ -108,7 +109,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo); +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); @@ -161,25 +162,25 @@ typedef struct STsdbReader STsdbReader; #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 -int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, - SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables); -int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num); -int32_t tsdbReaderSetId(void *pReader, const char *idstr); -void tsdbReaderClose2(STsdbReader *pReader); -int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext); -int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA); -void tsdbReleaseDataBlock2(STsdbReader *pReader); -int32_t tsdbRetrieveDataBlock2(STsdbReader *pReader, SSDataBlock **pBlock, SArray *pIdList); -int32_t tsdbReaderReset2(STsdbReader *pReader, SQueryTableDataCond *pCond); -int32_t tsdbGetFileBlocksDistInfo2(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); -int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle); -void *tsdbGetIdx2(SMeta *pMeta); -void *tsdbGetIvtIdx2(SMeta *pMeta); -uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); -void tsdbReaderSetCloseFlag(STsdbReader *pReader); -int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); -void tsdbSetFilesetDelimited(STsdbReader *pReader); -void tsdbReaderSetNotifyCb(STsdbReader *pReader, TsdReaderNotifyCbFn notifyFn, void *param); +int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, + SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables); +int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num); +int32_t tsdbReaderSetId(void *pReader, const char *idstr); +void tsdbReaderClose2(STsdbReader *pReader); +int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext); +int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA); +void tsdbReleaseDataBlock2(STsdbReader *pReader); +int32_t tsdbRetrieveDataBlock2(STsdbReader *pReader, SSDataBlock **pBlock, SArray *pIdList); +int32_t tsdbReaderReset2(STsdbReader *pReader, SQueryTableDataCond *pCond); +int32_t tsdbGetFileBlocksDistInfo2(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); +int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle); +void *tsdbGetIdx2(SMeta *pMeta); +void *tsdbGetIvtIdx2(SMeta *pMeta); +uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); +void tsdbReaderSetCloseFlag(STsdbReader *pReader); +int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); +void tsdbSetFilesetDelimited(STsdbReader *pReader); +void tsdbReaderSetNotifyCb(STsdbReader *pReader, TsdReaderNotifyCbFn notifyFn, void *param); int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, @@ -224,10 +225,10 @@ void tqReaderClose(STqReader *); bool tqGetTablePrimaryKey(STqReader *pReader); void tqSetTablePrimaryKey(STqReader *pReader, int64_t uid); -void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); -void tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char *id); -void tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); -void tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); +void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); +void tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList, const char *id); +void tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); +void tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); bool tqReaderIsQueriedTable(STqReader *pReader, uint64_t uid); bool tqCurrentBlockConsumed(const STqReader *pReader); @@ -243,7 +244,8 @@ int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, con int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); -int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, int64_t *createTime); +int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, + int64_t *createTime); int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished); // sma @@ -258,14 +260,14 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); -bool taosXGetTablePrimaryKey(SSnapContext *ctx); -void taosXSetTablePrimaryKey(SSnapContext *ctx, int64_t uid); -int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta, - SSnapContext **ctxRet); -int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); -int32_t getMetaTableInfoFromSnapshot(SSnapContext *ctx, SMetaTableInfo* info); -int32_t setForSnapShot(SSnapContext *ctx, int64_t uid); -void destroySnapContext(SSnapContext *ctx); +bool taosXGetTablePrimaryKey(SSnapContext *ctx); +void taosXSetTablePrimaryKey(SSnapContext *ctx, int64_t uid); +int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta, + SSnapContext **ctxRet); +int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); +int32_t getMetaTableInfoFromSnapshot(SSnapContext *ctx, SMetaTableInfo *info); +int32_t setForSnapShot(SSnapContext *ctx, int64_t uid); +void destroySnapContext(SSnapContext *ctx); // structs struct STsdbCfg { diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 59a129cac8..3f4dde23d3 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -105,6 +105,7 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->pauseCtbCursor = metaPauseCtbCursor; pMeta->closeCtbCursor = metaCloseCtbCursor; pMeta->ctbCursorNext = metaCtbCursorNext; + pMeta->getDBSize = vnodeGetDBSize; } void initTqAPI(SStoreTqReader* pTq) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 0929953e1c..2eb28447f2 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -870,6 +870,38 @@ int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64 return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid); } +int32_t vnodeGetDBSize(void *pVnode, int64_t *dataSize, int64_t *walSize, int64_t *metaSize) { + SVnode *pVnodeObj = pVnode; + if (pVnodeObj == NULL) { + return TSDB_CODE_VND_NOT_EXIST; + } + int32_t code = 0; + char path[TSDB_FILENAME_LEN] = {0}; + + char *dirName[] = {VNODE_TSDB_DIR, VNODE_WAL_DIR, VNODE_META_DIR}; + int64_t dirSize[3]; + + vnodeGetPrimaryDir(pVnodeObj->path, pVnodeObj->diskPrimary, pVnodeObj->pTfs, path, TSDB_FILENAME_LEN); + int32_t offset = strlen(path); + + SDiskSize size = {0}; + for (int i = 0; i < sizeof(dirName) / sizeof(dirName[0]); i++) { + (void)snprintf(path + offset, TSDB_FILENAME_LEN, "%s%s", TD_DIRSEP, dirName[i]); + code = taosGetDiskSize(path, &size); + if (code != 0) { + return code; + } + path[offset] = 0; + dirSize[i] = size.used; + memset(&size, 0, sizeof(size)); + } + + *dataSize = dirSize[0]; + *walSize = dirSize[1]; + *metaSize = dirSize[2]; + return 0; +} + int32_t vnodeGetStreamProgress(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { int32_t code = 0; SStreamProgressReq req; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 619d1206a7..ff32180d88 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2044,9 +2044,18 @@ static SSDataBlock* sysTableBuildVgUsage(SOperatorInfo* pOperator) { int32_t numOfRows = 0; const char* db = NULL; + int64_t totalSize = 0; + int32_t numOfCols = 0; int32_t vgId = 0; + int64_t dbSize = 0; + int64_t walSize = 0; + int64_t metaSize = 0; + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL); + code = pAPI->metaFn.getDBSize(pInfo->readHandle.vnode, &dbSize, &walSize, &metaSize); + QUERY_CHECK_CODE(code, lino, _end); + SName sn = {0}; char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; code = tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB); @@ -2065,10 +2074,6 @@ static SSDataBlock* sysTableBuildVgUsage(SOperatorInfo* pOperator) { char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - int64_t walSize = 1024, totalSize = 0; - int32_t numOfCols = 0; - // SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++); - SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, numOfCols++); code = colDataSetVal(pColInfoData, numOfRows, dbname, false); QUERY_CHECK_CODE(code, lino, _end);