From 5ed28febbd8f8ba77da8e648b7757288fb1dea40 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 4 Jun 2020 16:00:01 +0000 Subject: [PATCH] [TD-464] show queries and show streams --- src/client/inc/tscProfile.h | 2 +- src/client/src/tscProfile.c | 89 ++-- src/client/src/tscServer.c | 51 +- src/inc/taosdef.h | 3 +- src/inc/taosmsg.h | 13 +- src/mnode/inc/mnodeDef.h | 2 - src/mnode/inc/mnodeProfile.h | 7 + src/mnode/src/mnodeProfile.c | 717 +++++++++++--------------- src/mnode/src/mnodeShow.c | 20 +- src/plugins/monitor/src/monitorMain.c | 2 +- 10 files changed, 394 insertions(+), 512 deletions(-) diff --git a/src/client/inc/tscProfile.h b/src/client/inc/tscProfile.h index e82b7242a8..193fac0fb0 100644 --- a/src/client/inc/tscProfile.h +++ b/src/client/inc/tscProfile.h @@ -26,7 +26,7 @@ void tscAddIntoSqlList(SSqlObj *pSql); void tscRemoveFromSqlList(SSqlObj *pSql); void tscAddIntoStreamList(SSqlStream *pStream); void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj); -int tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj); +int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj); void tscKillQuery(STscObj *pObj, uint32_t killId); void tscKillStream(STscObj *pObj, uint32_t killId); void tscKillConnection(STscObj *pObj); diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 2162408ee6..09a34eb0c3 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -19,6 +19,7 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" +#include "taosmsg.h" void tscSaveSlowQueryFp(void *handle, void *tmrId); void *tscSlowQueryConn = NULL; @@ -96,9 +97,9 @@ void tscSaveSlowQuery(SSqlObj *pSql) { char *sql = malloc(200); int len = snprintf(sql, 200, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName, pSql->pTscObj->user, pSql->stime, pSql->res.useconds); - int sqlLen = snprintf(sql + len, TSDB_SHOW_SQL_LEN, "%s", pSql->sqlstr); - if (sqlLen > TSDB_SHOW_SQL_LEN - 1) { - sqlLen = len + TSDB_SHOW_SQL_LEN - 1; + int sqlLen = snprintf(sql + len, TSDB_SLOW_QUERY_SQL_LEN, "%s", pSql->sqlstr); + if (sqlLen > TSDB_SLOW_QUERY_SQL_LEN - 1) { + sqlLen = len + TSDB_SLOW_QUERY_SQL_LEN - 1; } else { sqlLen += len; } @@ -206,72 +207,60 @@ void tscKillStream(STscObj *pObj, uint32_t killId) { taos_close_stream(pStream); } -int tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { - char * pStart = pMsg; - char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256; - - SQqueryList *pQList = (SQqueryList *)pMsg; - pQList->numOfQueries = 0; +int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { + SCMHeartBeatMsg *pHeartbeat = pMsg; + pHeartbeat->connId = htonl(pObj->connId); - SQueryDesc *pQdesc = (SQueryDesc*)(pMsg + sizeof(SQqueryList)); + pHeartbeat->numOfQueries = 0; + SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData; // We extract the lock to tscBuildHeartBeatMsg function. - /* pthread_mutex_lock (&pObj->mutex); */ - pMsg += sizeof(SQqueryList); - SSqlObj *pSql = pObj->sqlList; - while (pSql) { + + //SSqlObj *pSql = pObj->sqlList; + //while (pSql) { /* * avoid sqlobj may not be correctly removed from sql list * e.g., forgetting to free the sql result may cause the sql object still in sql list */ - if (pSql->sqlstr == NULL) { - pSql = pSql->next; - continue; - } + // if (pSql->sqlstr == NULL) { + // pSql = pSql->next; + // continue; + // } - strncpy(pQdesc->sql, pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); + strncpy(pQdesc->sql, "select * from d1.t1", TSDB_SHOW_SQL_LEN - 1); pQdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0; - pQdesc->stime = pSql->stime; - pQdesc->queryId = pSql->queryId; - pQdesc->useconds = pSql->res.useconds; + pQdesc->stime = htobe64(taosGetTimestampMs()); + pQdesc->queryId = htonl(12); + pQdesc->useconds = htonl(34567); - pQList->numOfQueries++; + pHeartbeat->numOfQueries++; pQdesc++; - pSql = pSql->next; - pMsg += sizeof(SQueryDesc); - if (pMsg > pMax) break; - } + //pSql = pSql->next; + //} - SStreamList *pSList = (SStreamList *)pMsg; - pSList->numOfStreams = 0; - - SStreamDesc *pSdesc = (SStreamDesc*) (pMsg + sizeof(SStreamList)); + pHeartbeat->numOfStreams = 0; + SStreamDesc *pSdesc = (SStreamDesc *)pQdesc; - pMsg += sizeof(SStreamList); - SSqlStream *pStream = pObj->streamList; - while (pStream) { - strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); + //SSqlStream *pStream = pObj->streamList; + //while (pStream) { + strncpy(pSdesc->sql, "select * from d1.s1", TSDB_SHOW_SQL_LEN - 1); pSdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0; - pSdesc->streamId = pStream->streamId; - pSdesc->num = pStream->num; + pSdesc->streamId = htonl(98); + pSdesc->num = htobe64(76543); - pSdesc->useconds = pStream->useconds; - pSdesc->stime = pStream->stime - pStream->interval; - pSdesc->ctime = pStream->ctime; + pSdesc->useconds = htobe64(21); + pSdesc->stime = htobe64(taosGetTimestampMs()-1000); + pSdesc->ctime = htobe64(taosGetTimestampMs()); - pSdesc->slidingTime = pStream->slidingTime; - pSdesc->interval = pStream->interval; + pSdesc->slidingTime = htobe64(567); + pSdesc->interval = htobe64(89); - pSList->numOfStreams++; + pHeartbeat->numOfStreams++; pSdesc++; - pStream = pStream->next; - pMsg += sizeof(SStreamDesc); - if (pMsg > pMax) break; - } + //pStream = pStream->next; + //} - /* pthread_mutex_unlock (&pObj->mutex); */ - - return pMsg - pStart; + return pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg); } void tscKillConnection(STscObj *pObj) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5fd58178b1..c4a44f5b76 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1747,52 +1747,45 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } -int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) { - int size = 0; - STscObj *pObj = pSql->pTscObj; - - size += tsRpcHeadSize; - size += sizeof(SQqueryList); - - SSqlObj *tpSql = pObj->sqlList; - while (tpSql) { - size += sizeof(SQueryDesc); - tpSql = tpSql->next; - } - - size += sizeof(SStreamList); - SSqlStream *pStream = pObj->streamList; - while (pStream) { - size += sizeof(SStreamDesc); - pStream = pStream->next; - } - - return size + TSDB_EXTRA_PAYLOAD_SIZE; -} - int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; pthread_mutex_lock(&pObj->mutex); - int size = tscEstimateHeartBeatMsgLength(pSql); + int32_t numOfQueries = 0; + SSqlObj *tpSql = pObj->sqlList; + while (tpSql) { + tpSql = tpSql->next; + numOfQueries++; + } + + int32_t numOfStreams = 0; + SSqlStream *pStream = pObj->streamList; + while (pStream) { + pStream = pStream->next; + numOfStreams++; + } + + // ==> + numOfQueries = 1; + numOfStreams = 1; + + int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { pthread_mutex_unlock(&pObj->mutex); tscError("%p failed to malloc for heartbeat msg", pSql); return -1; } - SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg*)pCmd->payload; - pHeartbeat->connId = htonl(pSql->pTscObj->connId); + SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload; + int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj); - int msgLen = tscBuildQueryStreamDesc((char*)pHeartbeat + sizeof(pHeartbeat->connId), pObj); pthread_mutex_unlock(&pObj->mutex); - pCmd->payloadLen = msgLen + sizeof(pHeartbeat->connId); + pCmd->payloadLen = msgLen; pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT; - assert(msgLen + minMsgSize() <= size); return TSDB_CODE_SUCCESS; } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 7ec9aef295..f9a442373c 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -231,7 +231,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_SHELL_VNODE_BITS 24 #define TSDB_SHELL_SID_MASK 0xFF #define TSDB_HTTP_TOKEN_LEN 20 -#define TSDB_SHOW_SQL_LEN 512 +#define TSDB_SHOW_SQL_LEN 64 +#define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_METER_STATE_OFFLINE 0 #define TSDB_METER_STATE_ONLLINE 1 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 9c14a7dc54..380d89e8da 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -707,18 +707,11 @@ typedef struct { int64_t interval; } SStreamDesc; -typedef struct { - int32_t numOfQueries; -} SQqueryList; - -typedef struct { - int32_t numOfStreams; -} SStreamList; - typedef struct { uint32_t connId; - SQqueryList qlist; - SStreamList slist; + int32_t numOfQueries; + int32_t numOfStreams; + char pData[]; } SCMHeartBeatMsg; typedef struct { diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index 594fd3fd50..cf9058b9cf 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -182,8 +182,6 @@ typedef struct SUserObj { int8_t updateEnd[1]; int32_t refCount; struct SAcctObj * pAcct; - SQqueryList * pQList; // query list - SStreamList * pSList; // stream list } SUserObj; typedef struct { diff --git a/src/mnode/inc/mnodeProfile.h b/src/mnode/inc/mnodeProfile.h index a8fe210015..30745db035 100644 --- a/src/mnode/inc/mnodeProfile.h +++ b/src/mnode/inc/mnodeProfile.h @@ -29,6 +29,12 @@ typedef struct { uint32_t connId; uint64_t stime; uint64_t lastAccess; + uint32_t queryId; + uint32_t streamId; + int32_t numOfQueries; + int32_t numOfStreams; + SStreamDesc *pStreams; + SQueryDesc * pQueries; } SConnObj; int32_t mnodeInitProfile(); @@ -37,6 +43,7 @@ void mnodeCleanupProfile(); SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port); SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port); void mnodeReleaseConn(SConnObj *pConn); +int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index de041d862d..17e486a0eb 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -36,8 +36,9 @@ #include "mnodeVgroup.h" #include "mnodeWrite.h" -#define CONN_KEEP_TIME (tsShellActivityTimer * 3) +#define CONN_KEEP_TIME (tsShellActivityTimer * 3) #define CONN_CHECK_TIME (tsShellActivityTimer * 2) +#define QUERY_ID_SIZE 20 extern void *tsMnodeTmr; static SCacheObj *tsMnodeConnCache = NULL; @@ -49,15 +50,11 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); - static void mnodeFreeConn(void *data); static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg); -// static int32_t mnodeKillQuery(char *qidstr, void *pConn); -// static int32_t mnodeKillStream(char *qidstr, void *pConn); - int32_t mnodeInitProfile() { mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_QUERIES, mnodeGetQueryMeta); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_QUERIES, mnodeRetrieveQueries); @@ -140,6 +137,9 @@ SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t p static void mnodeFreeConn(void *data) { SConnObj *pConn = data; + tfree(pConn->pQueries); + tfree(pConn->pQueries); + mTrace("connId:%d, is destroyed", pConn->connId); } @@ -165,362 +165,11 @@ static void *mnodeGetNextConn(SHashMutableIterator *pIter, SConnObj **pConn) { return pIter; } -typedef struct { - int numOfConns; - int index; - SConnObj connInfo[]; -} SConnShow; - -typedef struct { - uint32_t ip; - uint16_t port; - char user[TSDB_TABLE_ID_LEN+ 1]; -} SCDesc; - -typedef struct { - int32_t index; - int32_t numOfQueries; - SCDesc * connInfo; - SCDesc **cdesc; - SQueryDesc qdesc[]; -} SQueryShow; - -typedef struct { - int32_t index; - int32_t numOfStreams; - SCDesc * connInfo; - SCDesc **cdesc; - SStreamDesc sdesc[]; -} SStreamShow; - -int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg) { -// SAcctObj *pAcct = pConn->pAcct; -// -// if (contLen <= 0 || pAcct == NULL) { -// return 0; -// } -// -// pthread_mutex_lock(&pAcct->mutex); -// -// if (pConn->pQList) { -// pAcct->acctInfo.numOfQueries -= pConn->pQList->numOfQueries; -// pAcct->acctInfo.numOfStreams -= pConn->pSList->numOfStreams; -// } -// -// pConn->pQList = realloc(pConn->pQList, contLen); -// memcpy(pConn->pQList, cont, contLen); -// -// pConn->pSList = (SStreamList *)(((char *)pConn->pQList) + pConn->pQList->numOfQueries * sizeof(SQueryDesc) + sizeof(SQqueryList)); -// -// pAcct->acctInfo.numOfQueries += pConn->pQList->numOfQueries; -// pAcct->acctInfo.numOfStreams += pConn->pSList->numOfStreams; -// -// pthread_mutex_unlock(&pAcct->mutex); - - return TSDB_CODE_SUCCESS; -} - -int32_t mnodeGetQueries(SShowObj *pShow, void *pConn) { -// SAcctObj * pAcct = pConn->pAcct; -// SQueryShow *pQueryShow; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pQueryShow = malloc(sizeof(SQueryDesc) * pAcct->acctInfo.numOfQueries + sizeof(SQueryShow)); -// pQueryShow->numOfQueries = 0; -// pQueryShow->index = 0; -// pQueryShow->connInfo = NULL; -// pQueryShow->cdesc = NULL; -// -// if (pAcct->acctInfo.numOfQueries > 0) { -// pQueryShow->connInfo = (SCDesc *)malloc(pAcct->acctInfo.numOfConns * sizeof(SCDesc)); -// pQueryShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfQueries * sizeof(SCDesc *)); -// -// pConn = pAcct->pConn; -// SQueryDesc * pQdesc = pQueryShow->qdesc; -// SCDesc * pCDesc = pQueryShow->connInfo; -// SCDesc **ppCDesc = pQueryShow->cdesc; -// -// while (pConn) { -// if (pConn->pQList && pConn->pQList->numOfQueries > 0) { -// pCDesc->ip = pConn->ip; -// pCDesc->port = pConn->port; -// strcpy(pCDesc->user, pConn->pUser->user); -// -// memcpy(pQdesc, pConn->pQList->qdesc, sizeof(SQueryDesc) * pConn->pQList->numOfQueries); -// pQdesc += pConn->pQList->numOfQueries; -// pQueryShow->numOfQueries += pConn->pQList->numOfQueries; -// for (int32_t i = 0; i < pConn->pQList->numOfQueries; ++i, ++ppCDesc) *ppCDesc = pCDesc; -// -// pCDesc++; -// } -// pConn = pConn->next; -// } -// } -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// // sorting based on useconds -// -// pShow->pIter = pQueryShow; - - return 0; -} - -int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { - int32_t cols = 0; - - SSchema *pSchema = pMeta->schema; - - pShow->bytes[cols] = TSDB_USER_LEN; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "user"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 14; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "ip:port:id"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "created_time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; - strcpy(pSchema[cols].name, "time(us)"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = TSDB_SHOW_SQL_LEN; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "sql"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htons(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - - pShow->numOfRows = 1000000; - pShow->pIter = NULL; - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - - mnodeGetQueries(pShow, pConn); - return 0; -} - -int32_t mnodeKillQuery(char *qidstr, void *pConn) { -// char *temp, *chr, idstr[64]; -// strcpy(idstr, qidstr); -// -// temp = idstr; -// chr = strchr(temp, ':'); -// if (chr == NULL) goto _error; -// *chr = 0; -// uint32_t ip = inet_addr(temp); -// -// temp = chr + 1; -// chr = strchr(temp, ':'); -// if (chr == NULL) goto _error; -// *chr = 0; -// uint16_t port = htons(atoi(temp)); -// -// temp = chr + 1; -// uint32_t queryId = atoi(temp); -// -// SAcctObj *pAcct = pConn->pAcct; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pConn = pAcct->pConn; -// while (pConn) { -// if (pConn->ip == ip && pConn->port == port && pConn->pQList) { -// int32_t i; -// SQueryDesc *pQDesc = pConn->pQList->qdesc; -// for (i = 0; i < pConn->pQList->numOfQueries; ++i, ++pQDesc) { -// if (pQDesc->queryId == queryId) break; -// } -// -// if (i < pConn->pQList->numOfQueries) break; -// } -// -// pConn = pConn->next; -// } -// -// if (pConn) pConn->queryId = queryId; -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// if (pConn == NULL || pConn->pQList == NULL || pConn->pQList->numOfQueries == 0) goto _error; -// -// mTrace("query:%s is there, kill it", qidstr); -// return 0; -// -//_error: -// mTrace("query:%s is not there", qidstr); - - return TSDB_CODE_INVALID_QUERY_ID; -} - -int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn) { - int32_t numOfRows = 0; - char *pWrite; - int32_t cols = 0; - - SQueryShow *pQueryShow = (SQueryShow *)pShow->pIter; - - if (rows > pQueryShow->numOfQueries - pQueryShow->index) rows = pQueryShow->numOfQueries - pQueryShow->index; - - while (numOfRows < rows) { - SQueryDesc *pNode = pQueryShow->qdesc + pQueryShow->index; - SCDesc *pCDesc = pQueryShow->cdesc[pQueryShow->index]; - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pCDesc->user); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - uint32_t ip = pCDesc->ip; - sprintf(pWrite, "%d.%d.%d.%d:%hu:%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pCDesc->port), - pNode->queryId); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->stime; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->useconds; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pNode->sql); - cols++; - - numOfRows++; - pQueryShow->index++; - } - - if (numOfRows == 0) { - tfree(pQueryShow->cdesc); - tfree(pQueryShow->connInfo); - tfree(pQueryShow); - } - - pShow->numOfReads += numOfRows; - return numOfRows; -} - -int32_t mnodeGetStreams(SShowObj *pShow, void *pConn) { -// SAcctObj * pAcct = pConn->pAcct; -// SStreamShow *pStreamShow; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pStreamShow = malloc(sizeof(SStreamDesc) * pAcct->acctInfo.numOfStreams + sizeof(SQueryShow)); -// pStreamShow->numOfStreams = 0; -// pStreamShow->index = 0; -// pStreamShow->connInfo = NULL; -// pStreamShow->cdesc = NULL; -// -// if (pAcct->acctInfo.numOfStreams > 0) { -// pStreamShow->connInfo = (SCDesc *)malloc(pAcct->acctInfo.numOfConns * sizeof(SCDesc)); -// pStreamShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfStreams * sizeof(SCDesc *)); -// -// pConn = pAcct->pConn; -// SStreamDesc * pSdesc = pStreamShow->sdesc; -// SCDesc * pCDesc = pStreamShow->connInfo; -// SCDesc **ppCDesc = pStreamShow->cdesc; -// -// while (pConn) { -// if (pConn->pSList && pConn->pSList->numOfStreams > 0) { -// pCDesc->ip = pConn->ip; -// pCDesc->port = pConn->port; -// strcpy(pCDesc->user, pConn->pUser->user); -// -// memcpy(pSdesc, pConn->pSList->sdesc, sizeof(SStreamDesc) * pConn->pSList->numOfStreams); -// pSdesc += pConn->pSList->numOfStreams; -// pStreamShow->numOfStreams += pConn->pSList->numOfStreams; -// for (int32_t i = 0; i < pConn->pSList->numOfStreams; ++i, ++ppCDesc) *ppCDesc = pCDesc; -// -// pCDesc++; -// } -// pConn = pConn->next; -// } -// } -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// // sorting based on useconds -// -// pShow->pIter = pStreamShow; - - return 0; -} - - -int32_t mnodeKillStream(char *qidstr, void *pConn) { -// char *temp, *chr, idstr[64]; -// strcpy(idstr, qidstr); -// -// temp = idstr; -// chr = strchr(temp, ':'); -// if (chr == NULL) goto _error; -// *chr = 0; -// uint32_t ip = inet_addr(temp); -// -// temp = chr + 1; -// chr = strchr(temp, ':'); -// if (chr == NULL) goto _error; -// *chr = 0; -// uint16_t port = htons(atoi(temp)); -// -// temp = chr + 1; -// uint32_t streamId = atoi(temp); -// -// SAcctObj *pAcct = pConn->pAcct; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pConn = pAcct->pConn; -// while (pConn) { -// if (pConn->ip == ip && pConn->port == port && pConn->pSList) { -// int32_t i; -// SStreamDesc *pSDesc = pConn->pSList->sdesc; -// for (i = 0; i < pConn->pSList->numOfStreams; ++i, ++pSDesc) { -// if (pSDesc->streamId == streamId) break; -// } -// -// if (i < pConn->pSList->numOfStreams) break; -// } -// -// pConn = pConn->next; -// } -// -// if (pConn) pConn->streamId = streamId; -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// if (pConn == NULL || pConn->pSList == NULL || pConn->pSList->numOfStreams == 0) goto _error; -// -// mTrace("stream:%s is there, kill it", qidstr); -// return 0; -// -//_error: -// mTrace("stream:%s is not there", qidstr); - - return TSDB_CODE_INVALID_STREAM_ID; -} - -int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { +static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + SUserObj *pUser = mnodeGetUserFromConn(pConn); + if (pUser == NULL) return 0; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + int32_t cols = 0; SSchema *pSchema = pMeta->schema; @@ -568,7 +217,7 @@ int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { return 0; } -int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; SConnObj *pConnObj = NULL; int32_t cols = 0; @@ -606,71 +255,319 @@ int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pCon } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 4; + const int32_t NUM_OF_COLUMNS = 5; mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); return numOfRows; } +// not thread safe, need optimized +int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg) { + pConn->numOfQueries = htonl(pHBMsg->numOfQueries); + if (pConn->numOfQueries > 0) { + pConn->pQueries = calloc(sizeof(SQueryDesc), pConn->numOfQueries); + memcpy(pConn->pQueries, pHBMsg->pData, pConn->numOfQueries * sizeof(SQueryDesc)); + } + + pConn->numOfQueries = htonl(pHBMsg->numOfQueries); + if (pConn->numOfQueries > 0) { + pConn->pStreams = calloc(sizeof(SStreamDesc), pConn->numOfStreams); + memcpy(pConn->pStreams, pHBMsg->pData + pConn->numOfQueries * sizeof(SQueryDesc), + pConn->numOfStreams * sizeof(SStreamDesc)); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + SUserObj *pUser = mnodeGetUserFromConn(pConn); + if (pUser == NULL) return 0; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "queryId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "user"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ip:port"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "created time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; + strcpy(pSchema[cols].name, "time(us)"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sql"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = 1000000; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + return 0; +} + +static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + int32_t numOfRows = 0; + SConnObj *pConnObj = NULL; + int32_t cols = 0; + char * pWrite; + char ipStr[TSDB_IPv4ADDR_LEN + 7]; + + while (numOfRows < rows) { + pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj); + if (pConnObj == NULL) break; + + for (int32_t i = 0; i < pConnObj->numOfQueries; ++i) { + SQueryDesc *pDesc = pConnObj->pQueries + i; + cols = 0; + + snprintf(ipStr, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId)); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, QUERY_ID_SIZE); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, TSDB_USER_LEN); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + snprintf(ipStr, TSDB_IPv4ADDR_LEN + 6, "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, TSDB_IPv4ADDR_LEN + 6); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->stime); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->useconds); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, TSDB_SHOW_SQL_LEN); + cols++; + + numOfRows++; + } + } + + pShow->numOfReads += numOfRows; + const int32_t NUM_OF_COLUMNS = 6; + mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + return numOfRows; +} + static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + SUserObj *pUser = mnodeGetUserFromConn(pConn); + if (pUser == NULL) return 0; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "streamId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "user"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ip:port"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "created time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "exec time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; + strcpy(pSchema[cols].name, "time(us)"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sql"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "cycles"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = 1000000; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + return 0; } static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) { - return 0; + int32_t numOfRows = 0; + SConnObj *pConnObj = NULL; + int32_t cols = 0; + char * pWrite; + char ipStr[TSDB_IPv4ADDR_LEN + 7]; + + while (numOfRows < rows) { + pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj); + if (pConnObj == NULL) break; + + for (int32_t i = 0; i < pConnObj->numOfStreams; ++i) { + SStreamDesc *pDesc = pConnObj->pStreams + i; + cols = 0; + + snprintf(ipStr, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->streamId)); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, QUERY_ID_SIZE); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, TSDB_USER_LEN); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + snprintf(ipStr, TSDB_IPv4ADDR_LEN + 6, "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, TSDB_IPv4ADDR_LEN + 6); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->ctime); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->stime); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->useconds); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, TSDB_SHOW_SQL_LEN); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = htonl(pDesc->num); + cols++; + + numOfRows++; + } + } + + pShow->numOfReads += numOfRows; + const int32_t NUM_OF_COLUMNS = 8; + mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + return numOfRows; } -int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) { - // SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; +static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) { + SUserObj *pUser = pMsg->pUser; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + SCMKillQueryMsg *pKill = pMsg->rpcMsg.pCont; + char* connIdStr = pKill->queryId; + + char *chr = strchr(connIdStr, ':'); + if (chr == NULL) return TSDB_CODE_INVALID_QUERY_ID; + *chr = 0; + + uint32_t queryId = atoi(chr + 1); + + SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr); + if (pConn == NULL) { + mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId); + return TSDB_CODE_INVALID_CONNECTION; + } else { + mPrint("connId:%s, queryId:%d is killed by user:%s", connIdStr, queryId, pUser->user); + pConn->queryId = queryId; + taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false); + return TSDB_CODE_SUCCESS; + } +} + +static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) { + SUserObj *pUser = pMsg->pUser; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + SCMKillQueryMsg *pKill = pMsg->rpcMsg.pCont; + char* connIdStr = pKill->queryId; - // SUserObj *pUser = mnodeGetUserFromConn(pMsg->thandle); - // if (pUser == NULL) { - // rpcRsp.code = TSDB_CODE_INVALID_USER; - // rpcSendResponse(&rpcRsp); - // return; - // } + char *chr = strchr(connIdStr, ':'); + if (chr == NULL) return TSDB_CODE_INVALID_QUERY_ID; + *chr = 0; - // SCMKillQueryMsg *pKill = pMsg->pCont; - // int32_t code; + uint32_t streamId = atoi(chr + 1); - // if (!pUser->writeAuth) { - // code = TSDB_CODE_NO_RIGHTS; - // } else { - // code = mgmtKillQuery(pKill->queryId, pMsg->thandle); - // } - - // rpcRsp.code = code; - // rpcSendResponse(&rpcRsp); - // mnodeDecUserRef(pUser); - return TSDB_CODE_SUCCESS; + SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr); + if (pConn == NULL) { + mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId); + return TSDB_CODE_INVALID_CONNECTION; + } else { + mPrint("connId:%s, streamId:%d is killed by user:%s", connIdStr, streamId, pUser->user); + pConn->streamId = streamId; + taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false); + return TSDB_CODE_SUCCESS; + } } -int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) { - // SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - - // SUserObj *pUser = mnodeGetUserFromConn(pMsg->thandle); - // if (pUser == NULL) { - // rpcRsp.code = TSDB_CODE_INVALID_USER; - // rpcSendResponse(&rpcRsp); - // return; - // } - - // SCMKillStreamMsg *pKill = pMsg->pCont; - // int32_t code; - - // if (!pUser->writeAuth) { - // code = TSDB_CODE_NO_RIGHTS; - // } else { - // code = mgmtKillStream(pKill->queryId, pMsg->thandle); - // } - - // rpcRsp.code = code; - // rpcSendResponse(&rpcRsp); - // mnodeDecUserRef(pUser); - return TSDB_CODE_SUCCESS; -} - -int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { +static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { SUserObj *pUser = pMsg->pUser; if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 0713328df5..c1edd309c8 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -243,23 +243,27 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { // pHBRsp->killConnection = 1; } else { pHBRsp->connId = htonl(pConn->connId); + mnodeSaveQueryStreamList(pConn, pHBMsg); + if (pConn->killed != 0) { pHBRsp->killConnection = 1; } + + if (pConn->streamId != 0) { + pHBRsp->streamId = htonl(pConn->streamId); + pConn->streamId = 0; + } + + if (pConn->queryId != 0) { + pHBRsp->queryId = htonl(pConn->queryId); + pConn->queryId = 0; + } } pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum()); pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); mnodeGetMnodeIpSetForShell(&pHBRsp->ipList); - /* - * TODO - * Dispose kill stream or kill query message - */ - // pHBRsp->queryId = 0; - // pHBRsp->streamId = 0; - // pHBRsp->killConnection = 0; - pMsg->rpcRsp.rsp = pHBRsp; pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c index 0c27233289..5275dd9d6c 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monitorMain.c @@ -183,7 +183,7 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.slowquery(ts timestamp, username " "binary(%d), created_time timestamp, time bigint, sql binary(%d))", - tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SHOW_SQL_LEN); + tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SLOW_QUERY_SQL_LEN); } else if (cmd == MONITOR_CMD_CREATE_TB_LOG) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.log(ts timestamp, level tinyint, "