diff --git a/include/common/systable.h b/include/common/systable.h index 77fd18da39..34d46953f6 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -56,6 +56,7 @@ extern "C" { #define TSDB_INS_TABLE_GRANTS_FULL "ins_grants_full" #define TSDB_INS_TABLE_GRANTS_LOGS "ins_grants_logs" #define TSDB_INS_TABLE_MACHINES "ins_machines" +#define TSDB_INS_TABLE_TSMAS "ins_tsmas" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFS_TABLE_SMAS "perf_smas" diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 102ee7973a..fc46a20d3a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -145,6 +145,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_STREAM_TASKS, TSDB_MGMT_TABLE_PRIVILEGES, TSDB_MGMT_TABLE_VIEWS, + TSDB_MGMT_TABLE_TSMAS, TSDB_MGMT_TABLE_COMPACT, TSDB_MGMT_TABLE_COMPACT_DETAIL, TSDB_MGMT_TABLE_GRANTS_FULL, @@ -369,9 +370,9 @@ typedef enum ENodeType { QUERY_NODE_SHOW_GRANTS_FULL_STMT, QUERY_NODE_SHOW_GRANTS_LOGS_STMT, QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT, + QUERY_NODE_SHOW_TSMAS_STMT, QUERY_NODE_CREATE_TSMA_STMT, QUERY_NODE_SHOW_CREATE_TSMA_STMT, - QUERY_NODE_SHOW_TSMAS_STMT, QUERY_NODE_DROP_TSMA_STMT, // logic plan node diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 678962a00a..ba692840a1 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -372,6 +372,18 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = { {.name = "finished", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, }; +static const SSysDbTableSchema tsmaSchema[] = { + {.name = "tsma_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "target_stb", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "interval", .bytes = 64 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "create_sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "func_list", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, +}; static const SSysDbTableSchema userGrantsFullSchema[] = { {.name = "grant_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, @@ -427,6 +439,7 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_GRANTS_LOGS, userGrantsLogsSchema, tListLen(userGrantsLogsSchema), true}, {TSDB_INS_TABLE_MACHINES, userMachinesSchema, tListLen(userMachinesSchema), true}, {TSDB_INS_TABLE_ARBGROUPS, arbGroupsSchema, tListLen(arbGroupsSchema), true}, + {TSDB_INS_TABLE_TSMAS, tsmaSchema, tListLen(tsmaSchema), false}, }; static const SSysDbTableSchema connectionsSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index b3124fa99f..8c6ab6e518 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -131,6 +131,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_GRANTS_LOGS; } else if (strncasecmp(name, TSDB_INS_TABLE_MACHINES, len) == 0) { type = TSDB_MGMT_TABLE_MACHINES; + } else if (strncasecmp(name, TSDB_INS_TABLE_TSMAS, len) == 0) { + type = TSDB_MGMT_TABLE_TSMAS; } else { mError("invalid show name:%s len:%d", name, len); } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index e25b6891c7..8887d0d446 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -31,6 +31,7 @@ #include "mndVgroup.h" #include "parser.h" #include "tname.h" +#include "functionMgt.h" #define TSDB_SMA_VER_NUMBER 1 #define TSDB_SMA_RESERVE_SIZE 64 @@ -55,6 +56,9 @@ static int32_t mndProcessDropIdxReq(SRpcMsg *pReq); static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter); +static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter); + typedef struct SCreateTSMACxt { SMnode * pMnode; const SRpcMsg * pRpcReq; @@ -95,6 +99,8 @@ int32_t mndInitSma(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndRetrieveTSMA); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TSMAS, mndCancelRetrieveTSMA); return sdbSetTable(pMnode->pSdb, table); } @@ -1785,3 +1791,111 @@ _OVER: mndReleaseDb(pMnode, pDb); return code; } + +static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + SDbObj * pDb = NULL; + int32_t numOfRows = 0; + SSmaObj * pSma = NULL; + SMnode * pMnode = pReq->info.node; + SColumnInfoData *pColInfo; + if (pShow->db[0]) { + pDb = mndAcquireDb(pMnode, pShow->db); + } + if (pShow->pIter == NULL) { + pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter)); + } + SSmaAndTagIter *pIter = pShow->pIter; + while (numOfRows < rows) { + pIter->pSmaIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma); + if (pIter->pSmaIter == NULL) break; + + if (pDb && pSma->dbUid != pDb->uid) { + sdbRelease(pMnode->pSdb, pSma); + continue; + } + + int32_t cols = 0; + SName n = {0}; + + tNameFromString(&n, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + char smaName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(smaName, (char *)tNameGetTableName(&n)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)smaName, false); + + char db[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(db, (char *)mndGetDbStr(pSma->db)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)db, false); + + tNameFromString(&n, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + char srcTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(srcTb, (char *)tNameGetTableName(&n)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)srcTb, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)db, false); + + tNameFromString(&n, pSma->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + char targetTb[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(targetTb, (char*)tNameGetTableName(&n)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)targetTb, false); + + // stream name + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)smaName, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)(&pSma->createdTime), false); + + // interval + // TODO replace 64 + char interval[64 + VARSTR_HEADER_SIZE] = {0}; + SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db); + int32_t len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval, + getPrecisionUnit(pSrcDb->cfg.precision)); + varDataSetLen(interval, len); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, interval, false); + + // create sql + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + char buf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; + len = snprintf(buf + 2, TSDB_SHOW_SQL_LEN, "%s", pSma->sql); + varDataSetLen(buf, len); + colDataSetVal(pColInfo, numOfRows, buf, false); + + // func list + len = 0; + char * start = buf + VARSTR_HEADER_SIZE; + SNode *pNode = NULL, *pFunc = NULL; + nodesStringToNode(pSma->ast, &pNode); + if (pNode) { + FOREACH(pFunc, ((SSelectStmt *)pNode)->pProjectionList) { + if (nodeType(pFunc) == QUERY_NODE_FUNCTION) { + SFunctionNode *pFuncNode = (SFunctionNode *)pFunc; + if (!fmIsAggFunc(pFuncNode->funcId)) continue; + len += snprintf(start, TSDB_SHOW_SQL_LEN - len, "%s%s", start != buf + VARSTR_HEADER_SIZE ? "," : "", + ((SExprNode *)pFunc)->userAlias); + start = buf + VARSTR_HEADER_SIZE + len; + } + } + } + varDataSetLen(buf, len); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, buf, false); + + numOfRows++; + mndReleaseSma(pMnode, pSma); + mndReleaseDb(pMnode, pSrcDb); + } + mndReleaseDb(pMnode, pDb); + pShow->numOfRows += numOfRows; + if (numOfRows < rows) pShow->pIter = NULL; + return numOfRows; +} + +static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) { +} diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bf6b9f862e..731c316c95 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -107,6 +107,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp); // for msgs inside mnode + // TODO change the name mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode); mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode); diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index da4647af0c..2685e246f1 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1890,7 +1890,7 @@ static bool needDbShowStmt(ENodeType type) { return QUERY_NODE_SHOW_TABLES_STMT == type || QUERY_NODE_SHOW_STABLES_STMT == type || QUERY_NODE_SHOW_VGROUPS_STMT == type || QUERY_NODE_SHOW_INDEXES_STMT == type || QUERY_NODE_SHOW_TAGS_STMT == type || QUERY_NODE_SHOW_TABLE_TAGS_STMT == type || - QUERY_NODE_SHOW_VIEWS_STMT == type; + QUERY_NODE_SHOW_VIEWS_STMT == type || QUERY_NODE_SHOW_TSMAS_STMT == type; } SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type) { diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index b032c15e09..280d03df44 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -775,6 +775,11 @@ static int32_t collectMetaKeyFromCreateTSMAStmt(SCollectMetaKeyCxt* pCxt, SCreat return code; } +static int32_t collectMetaKeyFromShowTSMASStmt(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { + return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TSMAS, + pCxt->pMetaCache); +} + static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { pCxt->pStmt = pStmt; switch (nodeType(pStmt)) { @@ -909,6 +914,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromCreateTSMAStmt(pCxt, (SCreateTSMAStmt*)pStmt); case QUERY_NODE_DROP_TSMA_STMT: break; + case QUERY_NODE_SHOW_TSMAS_STMT: + return collectMetaKeyFromShowTSMASStmt(pCxt, (SShowStmt*)pStmt); default: break; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 92f8e79e20..39bec5c48d 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -305,6 +305,10 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = { { .showType = QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT, .pDbName = TSDB_INFORMATION_SCHEMA_DB, .pTableName = TSDB_INS_TABLE_MACHINES, + }, + { .showType = QUERY_NODE_SHOW_TSMAS_STMT, + .pDbName = TSDB_INFORMATION_SCHEMA_DB, + .pTableName = TSDB_INS_TABLE_TSMAS, .numOfShowCols = 1, .pShowCols = {"*"} }, @@ -12491,6 +12495,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { case QUERY_NODE_SHOW_GRANTS_LOGS_STMT: case QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT: case QUERY_NODE_SHOW_ARBGROUPS_STMT: + case QUERY_NODE_SHOW_TSMAS_STMT: code = rewriteShow(pCxt, pQuery); break; case QUERY_NODE_SHOW_VGROUPS_STMT: