From 94b5031554a6bcd11e0f1354e44b77c0c2a34cc9 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 13 Apr 2022 10:48:26 +0800 Subject: [PATCH] feature/qnode --- include/util/tdef.h | 7 ++++++ source/client/inc/clientInt.h | 3 +-- source/dnode/mnode/impl/inc/mndInt.h | 5 ++++ source/dnode/mnode/impl/src/mndInfoSchema.c | 5 +--- source/dnode/mnode/impl/src/mndStb.c | 6 +++++ source/dnode/mnode/impl/src/mnode.c | 2 ++ source/libs/catalog/inc/catalogInt.h | 8 +++---- source/libs/catalog/src/catalog.c | 26 ++++++++++----------- tools/shell/src/shellEngine.c | 2 ++ 9 files changed, 41 insertions(+), 23 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 263c90c0f8..d66e555f8c 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -113,6 +113,13 @@ extern const int32_t TYPE_BYTES[15]; #define TSDB_INS_TABLE_BNODES "bnodes" #define TSDB_INS_TABLE_SNODES "snodes" +#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" +#define TSDB_PERFS_TABLE_CONNECTIONS "connections" +#define TSDB_PERFS_TABLE_QUERIES "queries" +#define TSDB_PERFS_TABLE_TOPICS "topics" +#define TSDB_PERFS_TABLE_CONSUMERS "consumers" +#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" + #define TSDB_INDEX_TYPE_SMA "SMA" #define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT" diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 4ec01e5693..4e3299fd1b 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -43,8 +43,7 @@ extern "C" { } \ } while (0) -//#define HEARTBEAT_INTERVAL 1500 // ms -#define HEARTBEAT_INTERVAL 15000 // ms TODO +#define HEARTBEAT_INTERVAL 1500 // ms typedef struct SAppInstInfo SAppInstInfo; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index ad42eebc1b..5c72e47cc4 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -38,6 +38,10 @@ extern "C" { #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} +#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) +#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) +#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) + typedef int32_t (*MndMsgFp)(SNodeMsg *pMsg); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); @@ -118,6 +122,7 @@ typedef struct SMnode { STelemMgmt telemMgmt; SSyncMgmt syncMgmt; SHashObj *infosMeta; + SHashObj *perfsMeta; SGrantInfo grant; MndMsgFp msgFp[TDMT_MAX]; SMsgCb msgCb; diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index d60e7f5cf9..990f5fb70d 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -15,10 +15,7 @@ #define _DEFAULT_SOURCE #include "mndInfoSchema.h" - -#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) -#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) -#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) +#include "mndInt.h" //!!!! Note: only APPEND columns in below tables, NO insert !!!! static const SInfosTableSchema dnodesSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index afcea6e732..e6c5aa030b 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -18,6 +18,7 @@ #include "mndDb.h" #include "mndDnode.h" #include "mndInfoSchema.h" +#include "mndPerfSchema.h" #include "mndMnode.h" #include "mndShow.h" #include "mndTrans.h" @@ -1516,6 +1517,11 @@ static int32_t mndProcessTableMetaReq(SNodeMsg *pReq) { if (mndBuildInsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) { goto RETRIEVE_META_OVER; } + } else if (0 == strcmp(infoReq.dbFName, TSDB_PERFORMANCE_SCHEMA_DB)) { + mDebug("performance_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName); + if (mndBuildPerfsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) { + goto RETRIEVE_META_OVER; + } } else { mDebug("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName); if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 5a1780530c..7d1905a53e 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -23,6 +23,7 @@ #include "mndDnode.h" #include "mndFunc.h" #include "mndInfoSchema.h" +#include "mndPerfSchema.h" #include "mndMnode.h" #include "mndOffset.h" #include "mndProfile.h" @@ -208,6 +209,7 @@ static int32_t mndInitSteps(SMnode *pMnode, bool deploy) { if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1; if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; if (deploy) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 09f51dc03e..8938084724 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -227,20 +227,20 @@ typedef struct SCtgAction { #define CTG_FLAG_STB 0x1 #define CTG_FLAG_NOT_STB 0x2 #define CTG_FLAG_UNKNOWN_STB 0x4 -#define CTG_FLAG_INF_DB 0x8 +#define CTG_FLAG_SYS_DB 0x8 #define CTG_FLAG_FORCE_UPDATE 0x10 #define CTG_FLAG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB) #define CTG_FLAG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB) #define CTG_FLAG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB) -#define CTG_FLAG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB) +#define CTG_FLAG_IS_SYS_DB(_flag) ((_flag) & CTG_FLAG_SYS_DB) #define CTG_FLAG_IS_FORCE_UPDATE(_flag) ((_flag) & CTG_FLAG_FORCE_UPDATE) -#define CTG_FLAG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB) +#define CTG_FLAG_SET_SYS_DB(_flag) ((_flag) |= CTG_FLAG_SYS_DB) #define CTG_FLAG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0) #define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)) #define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) -#define CTG_IS_INF_DBNAME(_dbname) ((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) +#define CTG_IS_SYS_DBNAME(_dbname) (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB)))) #define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 2aa858fe06..21e0be40a4 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -217,7 +217,7 @@ int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) } char *p = strchr(dbFName, '.'); - if (p && CTG_IS_INF_DBNAME(p + 1)) { + if (p && CTG_IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; } @@ -304,7 +304,7 @@ int32_t ctgPushUpdateVgMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t d } char *p = strchr(dbFName, '.'); - if (p && CTG_IS_INF_DBNAME(p + 1)) { + if (p && CTG_IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; } @@ -336,7 +336,7 @@ int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, boo } char *p = strchr(output->dbFName, '.'); - if (p && CTG_IS_INF_DBNAME(p + 1)) { + if (p && CTG_IS_SYS_DBNAME(p + 1)) { memmove(output->dbFName, p + 1, strlen(p + 1)); } @@ -410,7 +410,7 @@ void ctgWReleaseVgInfo(SCtgDBCache *dbCache) { int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) { char *p = strchr(dbFName, '.'); - if (p && CTG_IS_INF_DBNAME(p + 1)) { + if (p && CTG_IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; } @@ -688,7 +688,7 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable } char dbFName[TSDB_DB_FNAME_LEN] = {0}; - if (CTG_FLAG_IS_INF_DB(flag)) { + if (CTG_FLAG_IS_SYS_DB(flag)) { strcpy(dbFName, pTableName->dbname); } else { tNameGetFullDbName(pTableName, dbFName); @@ -1721,7 +1721,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SVgroupInfo vgroupInfo = {0}; int32_t code = 0; - if (!CTG_FLAG_IS_INF_DB(flag)) { + if (!CTG_FLAG_IS_SYS_DB(flag)) { CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo)); } @@ -1732,7 +1732,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - if (CTG_FLAG_IS_INF_DB(flag)) { + if (CTG_FLAG_IS_SYS_DB(flag)) { ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(pTableName)); CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, (char *)pTableName->dbname, (char *)pTableName->tname, output)); @@ -1820,8 +1820,8 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons uint64_t suid = 0; STableMetaOutput *output = NULL; - if (CTG_IS_INF_DBNAME(pTableName->dbname)) { - CTG_FLAG_SET_INF_DB(flag); + if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { + CTG_FLAG_SET_SYS_DB(flag); } CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &inCache, flag, &dbId)); @@ -1829,7 +1829,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons int32_t tbType = 0; if (inCache) { - if (CTG_FLAG_MATCH_STB(flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(flag)) || (CTG_FLAG_IS_INF_DB(flag)))) { + if (CTG_FLAG_MATCH_STB(flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(flag)) || (CTG_FLAG_IS_SYS_DB(flag)))) { goto _return; } @@ -1885,7 +1885,7 @@ _return: if (CTG_TABLE_NOT_EXIST(code) && inCache) { char dbFName[TSDB_DB_FNAME_LEN] = {0}; - if (CTG_FLAG_IS_INF_DB(flag)) { + if (CTG_FLAG_IS_SYS_DB(flag)) { strcpy(dbFName, pTableName->dbname); } else { tNameGetFullDbName(pTableName, dbFName); @@ -2633,7 +2633,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - if (CTG_IS_INF_DBNAME(pTableName->dbname)) { + if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -2666,7 +2666,7 @@ _return: int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) { CTG_API_ENTER(); - if (CTG_IS_INF_DBNAME(pTableName->dbname)) { + if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index b89d517ad3..d883362636 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -344,6 +344,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { atomic_store_64(&result, 0); freeResultWithRid(oresult); + taos_free_result(pSql); + return; }