ins_tsmas

This commit is contained in:
wangjiaming0909 2023-11-27 16:59:58 +08:00
parent 08fc9170f5
commit f77cd43d26
9 changed files with 146 additions and 2 deletions

View File

@ -56,6 +56,7 @@ extern "C" {
#define TSDB_INS_TABLE_GRANTS_FULL "ins_grants_full"
#define TSDB_INS_TABLE_GRANTS_LOGS "ins_grants_logs"
#define TSDB_INS_TABLE_MACHINES "ins_machines"
#define TSDB_INS_TABLE_TSMAS "ins_tsmas"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "perf_smas"

View File

@ -145,6 +145,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_STREAM_TASKS,
TSDB_MGMT_TABLE_PRIVILEGES,
TSDB_MGMT_TABLE_VIEWS,
TSDB_MGMT_TABLE_TSMAS,
TSDB_MGMT_TABLE_COMPACT,
TSDB_MGMT_TABLE_COMPACT_DETAIL,
TSDB_MGMT_TABLE_GRANTS_FULL,
@ -369,9 +370,9 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_GRANTS_FULL_STMT,
QUERY_NODE_SHOW_GRANTS_LOGS_STMT,
QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT,
QUERY_NODE_SHOW_TSMAS_STMT,
QUERY_NODE_CREATE_TSMA_STMT,
QUERY_NODE_SHOW_CREATE_TSMA_STMT,
QUERY_NODE_SHOW_TSMAS_STMT,
QUERY_NODE_DROP_TSMA_STMT,
// logic plan node

View File

@ -372,6 +372,18 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = {
{.name = "finished", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
};
static const SSysDbTableSchema tsmaSchema[] = {
{.name = "tsma_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "target_stb", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "interval", .bytes = 64 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "create_sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "func_list", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema userGrantsFullSchema[] = {
{.name = "grant_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
@ -427,6 +439,7 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_GRANTS_LOGS, userGrantsLogsSchema, tListLen(userGrantsLogsSchema), true},
{TSDB_INS_TABLE_MACHINES, userMachinesSchema, tListLen(userMachinesSchema), true},
{TSDB_INS_TABLE_ARBGROUPS, arbGroupsSchema, tListLen(arbGroupsSchema), true},
{TSDB_INS_TABLE_TSMAS, tsmaSchema, tListLen(tsmaSchema), false},
};
static const SSysDbTableSchema connectionsSchema[] = {

View File

@ -131,6 +131,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_GRANTS_LOGS;
} else if (strncasecmp(name, TSDB_INS_TABLE_MACHINES, len) == 0) {
type = TSDB_MGMT_TABLE_MACHINES;
} else if (strncasecmp(name, TSDB_INS_TABLE_TSMAS, len) == 0) {
type = TSDB_MGMT_TABLE_TSMAS;
} else {
mError("invalid show name:%s len:%d", name, len);
}

View File

@ -31,6 +31,7 @@
#include "mndVgroup.h"
#include "parser.h"
#include "tname.h"
#include "functionMgt.h"
#define TSDB_SMA_VER_NUMBER 1
#define TSDB_SMA_RESERVE_SIZE 64
@ -55,6 +56,9 @@ static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter);
typedef struct SCreateTSMACxt {
SMnode * pMnode;
const SRpcMsg * pRpcReq;
@ -95,6 +99,8 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA);
return sdbSetTable(pMnode->pSdb, table);
}
@ -1785,3 +1791,111 @@ _OVER:
mndReleaseDb(pMnode, pDb);
return code;
}
static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SDbObj * pDb = NULL;
int32_t numOfRows = 0;
SSmaObj * pSma = NULL;
SMnode * pMnode = pReq->info.node;
SColumnInfoData *pColInfo;
if (pShow->db[0]) {
pDb = mndAcquireDb(pMnode, pShow->db);
}
if (pShow->pIter == NULL) {
pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
}
SSmaAndTagIter *pIter = pShow->pIter;
while (numOfRows < rows) {
pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
if (pIter->pSmaIter == NULL) break;
if (pDb && pSma->dbUid != pDb->uid) {
sdbRelease(pMnode->pSdb, pSma);
continue;
}
int32_t cols = 0;
SName n = {0};
tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false);
char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)db, false);
tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)srcTb, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)db, false);
tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(targetTb, (char*)tNameGetTableName(&n));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)targetTb, false);
// stream name
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)smaName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)(&pSma->createdTime), false);
// interval
// TODO replace 64
char interval[64 + VARSTR_HEADER_SIZE] = {0};
SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db);
int32_t len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
getPrecisionUnit(pSrcDb->cfg.precision));
varDataSetLen(interval, len);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, interval, false);
// create sql
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char buf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
len = snprintf(buf + 2, TSDB_SHOW_SQL_LEN, "%s", pSma->sql);
varDataSetLen(buf, len);
colDataSetVal(pColInfo, numOfRows, buf, false);
// func list
len = 0;
char * start = buf + VARSTR_HEADER_SIZE;
SNode *pNode = NULL, *pFunc = NULL;
nodesStringToNode(pSma->ast, &pNode);
if (pNode) {
FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) {
if (nodeType(pFunc) == QUERY_NODE_FUNCTION) {
SFunctionNode *pFuncNode = (SFunctionNode *)pFunc;
if (!fmIsAggFunc(pFuncNode->funcId)) continue;
len += snprintf(start, TSDB_SHOW_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "",
((SExprNode *)pFunc)->userAlias);
start = buf + VARSTR_HEADER_SIZE + len;
}
}
}
varDataSetLen(buf, len);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, buf, false);
numOfRows++;
mndReleaseSma(pMnode, pSma);
mndReleaseDb(pMnode, pSrcDb);
}
mndReleaseDb(pMnode, pDb);
pShow->numOfRows += numOfRows;
if (numOfRows < rows) pShow->pIter = NULL;
return numOfRows;
}
static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
}

View File

@ -107,6 +107,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
// for msgs inside mnode
// TODO change the name
mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode);
mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);

View File

@ -1890,7 +1890,7 @@ static bool needDbShowStmt(ENodeType type) {
return QUERY_NODE_SHOW_TABLES_STMT == type || QUERY_NODE_SHOW_STABLES_STMT == type ||
QUERY_NODE_SHOW_VGROUPS_STMT == type || QUERY_NODE_SHOW_INDEXES_STMT == type ||
QUERY_NODE_SHOW_TAGS_STMT == type || QUERY_NODE_SHOW_TABLE_TAGS_STMT == type ||
QUERY_NODE_SHOW_VIEWS_STMT == type;
QUERY_NODE_SHOW_VIEWS_STMT == type || QUERY_NODE_SHOW_TSMAS_STMT == type;
}
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type) {

View File

@ -775,6 +775,11 @@ static int32_t collectMetaKeyFromCreateTSMAStmt(SCollectMetaKeyCxt* pCxt, SCreat
return code;
}
static int32_t collectMetaKeyFromShowTSMASStmt(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TSMAS,
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
pCxt->pStmt = pStmt;
switch (nodeType(pStmt)) {
@ -909,6 +914,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromCreateTSMAStmt(pCxt, (SCreateTSMAStmt*)pStmt);
case QUERY_NODE_DROP_TSMA_STMT:
break;
case QUERY_NODE_SHOW_TSMAS_STMT:
return collectMetaKeyFromShowTSMASStmt(pCxt, (SShowStmt*)pStmt);
default:
break;
}

View File

@ -305,6 +305,10 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = {
{ .showType = QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT,
.pDbName = TSDB_INFORMATION_SCHEMA_DB,
.pTableName = TSDB_INS_TABLE_MACHINES,
},
{ .showType = QUERY_NODE_SHOW_TSMAS_STMT,
.pDbName = TSDB_INFORMATION_SCHEMA_DB,
.pTableName = TSDB_INS_TABLE_TSMAS,
.numOfShowCols = 1,
.pShowCols = {"*"}
},
@ -12491,6 +12495,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_GRANTS_LOGS_STMT:
case QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT:
case QUERY_NODE_SHOW_ARBGROUPS_STMT:
case QUERY_NODE_SHOW_TSMAS_STMT:
code = rewriteShow(pCxt, pQuery);
break;
case QUERY_NODE_SHOW_VGROUPS_STMT: