From 8a850dd3f2fb650e7c5b367e5f2b4fe2d4f7e84b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 3 Dec 2021 15:01:26 +0800 Subject: [PATCH] TD-10431 process mnode profile --- include/common/taosmsg.h | 1 + source/dnode/mgmt/impl/src/dndTransport.c | 1 + source/dnode/mnode/impl/inc/mndInt.h | 2 +- source/dnode/mnode/impl/src/mndDb.c | 9 +- source/dnode/mnode/impl/src/mndDnode.c | 9 +- source/dnode/mnode/impl/src/mndMnode.c | 4 +- source/dnode/mnode/impl/src/mndProfile.c | 142 ++++++++++++++++++++-- source/dnode/mnode/impl/src/mndShow.c | 10 +- source/dnode/mnode/impl/src/mndUser.c | 3 +- source/dnode/mnode/impl/src/mnode.c | 2 +- 10 files changed, 157 insertions(+), 26 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index e29accd249..e14788be47 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -81,6 +81,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_STREAM, "kill-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_HEARTBEAT, "heartbeat" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" ) diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index f3e71564b2..98a0b8e308 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -76,6 +76,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_KILL_STREAM] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_SHOW] = dndProcessMnodeReadMsg; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 0f4dd0633b..dbca6ad057 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -25,7 +25,7 @@ extern "C" { #endif -typedef int32_t (*MndMsgFp)(SMnode *pMnode, SMnodeMsg *pMsg); +typedef int32_t (*MndMsgFp)(SMnodeMsg *pMsg); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index d990e926bb..f01a43dfef 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "mndDb.h" -static int32_t mnodeProcessUseMsg(SMnode *pMnode, SMnodeMsg *pMsg); +static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg); int32_t mndInitDb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_USE_DB, mnodeProcessUseMsg); @@ -35,10 +35,11 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) { sdbRelease(pSdb, pDb); } -static int32_t mnodeProcessUseMsg(SMnode *pMnode, SMnodeMsg *pMsg) { - SUseDbMsg *pUseDbMsg = pMsg->rpcMsg.pCont; +static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SUseDbMsg *pUse = pMsg->rpcMsg.pCont; - strncpy(pMsg->db, pUseDbMsg->db, TSDB_FULL_DB_NAME_LEN); + strncpy(pMsg->db, pUse->db, TSDB_FULL_DB_NAME_LEN); SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); if (pDb != NULL) { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index a6cee3efc5..24e6113161 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -226,7 +226,8 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) { pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); } -static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { +static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; SStatusMsg *pStatus = pMsg->rpcMsg.pCont; mndParseStatusMsg(pStatus); @@ -315,11 +316,11 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessCreateDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessConfigDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) { return 0; } int32_t mndInitDnode(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_DNODE, diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index c1c17e4983..e3464cd327 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -103,9 +103,9 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { return sdbWrite(pMnode->pSdb, pRaw); } -static int32_t mndProcessCreateMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessDropMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) { return 0; } int32_t mndInitMnode(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_MNODE, diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index ef596f8ee1..d70d045a04 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -50,8 +50,11 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId, char *user, uint static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); -static int32_t mndProcessHeartBeatMsg(SMnode *pMnode, SMnodeMsg *pMsg); -static int32_t mndProcessConnectMsg(SMnode *pMnode, SMnodeMsg *pMsg); +static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg); +static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg); +static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg); +static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg); +static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg); static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow); @@ -74,10 +77,14 @@ int32_t mndInitProfile(SMnode *pMnode) { mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_HEARTBEAT, mndProcessHeartBeatMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONNECT, mndProcessConnectMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_KILL_QUERY, mndProcessKillQueryMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONNECT, mndProcessKillStreamMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_KILL_CONN, mndProcessKillConnectionMsg); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn); + return 0; } @@ -183,7 +190,8 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { taosHashCancelIterate(pMgmt->cache->pHashTable, pIter); } -static int32_t mndProcessConnectMsg(SMnode *pMnode, SMnodeMsg *pMsg) { +static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; SConnectMsg *pReq = pMsg->rpcMsg.pCont; pReq->pid = htonl(pReq->pid); @@ -240,7 +248,8 @@ static int32_t mndProcessConnectMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessHeartBeatMsg(SMnode *pMnode, SMnodeMsg *pMsg) { +static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; SHeartBeatMsg *pReq = pMsg->rpcMsg.pCont; pReq->connId = htonl(pReq->connId); pReq->pid = htonl(pReq->pid); @@ -280,6 +289,118 @@ static int32_t mndProcessHeartBeatMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); + if (pUser == NULL) return 0; + if (!pUser->superAuth) { + mndReleaseUser(pMnode, pUser); + terrno = TSDB_CODE_MND_NO_RIGHTS; + return -1; + } + mndReleaseUser(pMnode, pUser); + + SKillQueryMsg *pKill = pMsg->rpcMsg.pCont; + mInfo("kill query msg is received, queryId:%s", pKill->queryId); + + const char delim = ':'; + char *connIdStr = strtok(pKill->queryId, &delim); + char *queryIdStr = strtok(NULL, &delim); + + if (queryIdStr == NULL || connIdStr == NULL) { + mError("failed to kill query, queryId:%s", pKill->queryId); + terrno = TSDB_CODE_MND_INVALID_QUERY_ID; + return -1; + } + + int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); + + int32_t connId = atoi(connIdStr); + SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); + if (pConn == NULL) { + mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId); + terrno = TSDB_CODE_MND_INVALID_CONN_ID; + return -1; + } else { + mInfo("connId:%s, queryId:%d is killed by user:%s", connIdStr, queryId, pMsg->user); + pConn->queryId = queryId; + taosCacheRelease(pMgmt->cache, (void **)&pConn, false); + return 0; + } +} + +static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); + if (pUser == NULL) return 0; + if (!pUser->superAuth) { + mndReleaseUser(pMnode, pUser); + terrno = TSDB_CODE_MND_NO_RIGHTS; + return -1; + } + mndReleaseUser(pMnode, pUser); + + SKillQueryMsg *pKill = pMsg->rpcMsg.pCont; + mInfo("kill stream msg is received, streamId:%s", pKill->queryId); + + const char delim = ':'; + char *connIdStr = strtok(pKill->queryId, &delim); + char *streamIdStr = strtok(NULL, &delim); + + if (streamIdStr == NULL || connIdStr == NULL) { + mError("failed to kill stream, streamId:%s", pKill->queryId); + terrno = TSDB_CODE_MND_INVALID_STREAM_ID; + return -1; + } + + int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10); + int32_t connId = atoi(connIdStr); + + SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); + if (pConn == NULL) { + mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId); + terrno = TSDB_CODE_MND_INVALID_CONN_ID; + return -1; + } else { + mInfo("connId:%s, streamId:%d is killed by user:%s", connIdStr, streamId, pMsg->user); + pConn->streamId = streamId; + taosCacheRelease(pMgmt->cache, (void **)&pConn, false); + return TSDB_CODE_SUCCESS; + } +} + +static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); + if (pUser == NULL) return 0; + if (!pUser->superAuth) { + mndReleaseUser(pMnode, pUser); + terrno = TSDB_CODE_MND_NO_RIGHTS; + return -1; + } + mndReleaseUser(pMnode, pUser); + + SKillConnMsg *pKill = pMsg->rpcMsg.pCont; + int32_t connId = atoi(pKill->queryId); + SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); + if (pConn == NULL) { + mError("connId:%s, failed to kill, conn not exist", pKill->queryId); + terrno = TSDB_CODE_MND_INVALID_CONN_ID; + return -1; + } else { + mInfo("connId:%s, is killed by user:%s", pKill->queryId, pMsg->user); + pConn->killed = 1; + taosCacheRelease(pMgmt->cache, (void **)&pConn, false); + return TSDB_CODE_SUCCESS; + } +} + static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { SMnode *pMnode = pMsg->pMnode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -288,7 +409,8 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * if (pUser == NULL) return 0; if (!pUser->superAuth) { mndReleaseUser(pMnode, pUser); - return TSDB_CODE_MND_NO_RIGHTS; + terrno = TSDB_CODE_MND_NO_RIGHTS; + return -1; } mndReleaseUser(pMnode, pUser); @@ -415,7 +537,8 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj * if (pUser == NULL) return 0; if (!pUser->superAuth) { mndReleaseUser(pMnode, pUser); - return TSDB_CODE_MND_NO_RIGHTS; + terrno = TSDB_CODE_MND_NO_RIGHTS; + return -1; } mndReleaseUser(pMnode, pUser); @@ -519,8 +642,8 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t numOfRows = 0; SConnObj *pConnObj = NULL; int32_t cols = 0; - char * pWrite; - void * pIter; + char *pWrite; + void *pIter; char str[TSDB_IPv4ADDR_LEN + 6] = {0}; while (numOfRows < rows) { @@ -621,7 +744,8 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj if (pUser == NULL) return 0; if (!pUser->superAuth) { mndReleaseUser(pMnode, pUser); - return TSDB_CODE_MND_NO_RIGHTS; + terrno = TSDB_CODE_MND_NO_RIGHTS; + return -1; } mndReleaseUser(pMnode, pUser); diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 790f5c1737..ff9f2a9805 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -16,8 +16,8 @@ #define _DEFAULT_SOURCE #include "mndShow.h" -static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg); -static int32_t mndProcessRetrieveMsg(SMnode *pMnode, SMnodeMsg *pMsg); +static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg); +static int32_t mndProcessRetrieveMsg( SMnodeMsg *pMsg); static bool mndCheckRetrieveFinished(SShowObj *pShow); static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); @@ -42,7 +42,8 @@ int32_t mndInitShow(SMnode *pMnode) { void mndCleanupShow(SMnode *pMnode) {} -static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) { +static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { + SMnode *pMnode = pMnodeMsg->pMnode; SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMsg *pMsg = pMnodeMsg->rpcMsg.pCont; int8_t type = pMsg->type; @@ -108,7 +109,8 @@ static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) { } } -static int32_t mndProcessRetrieveMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) { +static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { + SMnode *pMnode = pMnodeMsg->pMnode; SShowMgmt *pMgmt = &pMnode->showMgmt; int32_t rowsToRead = 0; int32_t size = 0; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 78869bd416..d3a8cf56be 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -180,7 +180,8 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, return 0; } -static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) { +static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont; if (pCreate->user[0] == 0) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index e390deda6d..3a5b6c470b 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -386,7 +386,7 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { goto PROCESS_RPC_END; } - code = (*fp)(pMnode, pMsg); + code = (*fp)(pMsg); if (code != 0) { code = terrno; mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());