From 3a056eddea7072aae20d666e62be6db4bbfe777c Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 4 Mar 2020 20:49:23 +0800 Subject: [PATCH 1/5] for cluster module --- src/dnode/src/dnodeMgmt.c | 4 +- src/inc/taosmsg.h | 1 - src/mnode/inc/mgmtConn.h | 36 --------- src/mnode/inc/mgmtDnode.h | 3 + src/mnode/inc/mgmtGrant.h | 1 + src/mnode/inc/mgmtMnode.h | 5 +- src/mnode/inc/mgmtProfile.h | 20 ++--- src/mnode/inc/mgmtShell.h | 2 - src/mnode/src/mgmtConn.c | 153 ------------------------------------ src/mnode/src/mgmtDnode.c | 3 + src/mnode/src/mgmtGrant.c | 19 +++-- src/mnode/src/mgmtMnode.c | 55 ++++++------- src/mnode/src/mgmtProfile.c | 139 +++++++++++++++++++++++++++++++- src/mnode/src/mgmtShell.c | 73 ++++++++++++++++- 14 files changed, 267 insertions(+), 247 deletions(-) delete mode 100644 src/mnode/inc/mgmtConn.h delete mode 100644 src/mnode/src/mgmtConn.c diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index d8273738c3..d0cb0a39af 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -30,12 +30,10 @@ void (*dnodeInitMgmtIpFp)() = NULL; int32_t (*dnodeInitMgmtFp)() = NULL; void (*dnodeCleanUpMgmtFp)() = NULL; - -void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; +void (*dnodeProcessStatusRspFp)(void *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; - static void *tsStatusTimer = NULL; static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn); static void dnodeInitProcessShellMsg(); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a05bd20f84..7914c6cc86 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -595,7 +595,6 @@ typedef struct { typedef struct { int32_t code; - int32_t numOfVnodes; SDnodeState dnodeState; SRpcIpSet ipList; SVnodeAccess vnodeAccess[]; diff --git a/src/mnode/inc/mgmtConn.h b/src/mnode/inc/mgmtConn.h deleted file mode 100644 index 62dd5ebb42..0000000000 --- a/src/mnode/inc/mgmtConn.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_MGMT_CONN_H -#define TDENGINE_MGMT_CONN_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "mnode.h" - -int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); -int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn); - -bool mgmtCheckQhandle(uint64_t qhandle); -void mgmtSaveQhandle(void *qhandle); -void mgmtFreeQhandle(void *qhandle); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 4cdac1e7af..aafe45be5c 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -53,6 +53,9 @@ bool mgmtCheckConfigShow(SGlobalConfig *cfg); void mgmtSetDnodeUnRemove(SDnodeObj *pDnode); SDnodeObj* mgmtGetDnode(uint32_t ip); +extern int32_t (*mgmtCreateDnodeFp)(uint32_t ip); +extern int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtGrant.h b/src/mnode/inc/mgmtGrant.h index 1cfc88f94a..01a5068bad 100644 --- a/src/mnode/inc/mgmtGrant.h +++ b/src/mnode/inc/mgmtGrant.h @@ -30,6 +30,7 @@ void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries); int32_t mgmtCheckTimeSeries(uint32_t timeseries); int32_t mgmtCheckUserGrant(); int32_t mgmtCheckDbGrant(); +int32_t mgmtCheckDnodeGrant(); int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn); diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index d012997d13..9132993994 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -24,7 +24,10 @@ extern "C" { #include #include "mnode.h" - int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtInitMnodes(); +void mgmtCleanUpMnodes(); + +int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 959f9e65ab..5af38a73b8 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -22,20 +22,22 @@ extern "C" { #include "mnode.h" -int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); - -int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); - -int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn); - -int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); +bool mgmtCheckQhandle(uint64_t qhandle); +void mgmtSaveQhandle(void *qhandle); +void mgmtFreeQhandle(void *qhandle); int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg); +int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtKillQuery(char *qidstr, void *pConn); - int32_t mgmtKillStream(char *qidstr, void *pConn); - int32_t mgmtKillConnection(char *qidstr, void *pConn); enum { diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index f14871b5b2..06b0068652 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -28,10 +28,8 @@ int32_t mgmtInitShell(); void mgmtCleanUpShell(); extern int32_t (*mgmtCheckRedirectMsg)(void *pConn); -extern void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); -extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); /* * If table not exist, will create it diff --git a/src/mnode/src/mgmtConn.c b/src/mnode/src/mgmtConn.c deleted file mode 100644 index 5d7b8ab27f..0000000000 --- a/src/mnode/src/mgmtConn.c +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "mgmtConn.h" -#include "taosmsg.h" -#include "tschemautil.h" - -typedef struct { - char user[TSDB_TABLE_ID_LEN]; - uint64_t stime; - uint32_t ip; - uint16_t port; -} SConnInfo; - -typedef struct { - int numOfConns; - int index; - SConnInfo connInfo[]; -} SConnShow; - -int mgmtGetConns(SShowObj *pShow, void *pConn) { -// SAcctObj * pAcct = pConn->pAcct; -// SConnShow *pConnShow; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow)); -// pConnShow->index = 0; -// pConnShow->numOfConns = 0; -// -// if (pAcct->acctInfo.numOfConns > 0) { -// pConn = pAcct->pConn; -// SConnInfo *pConnInfo = pConnShow->connInfo; -// -// while (pConn && pConn->pUser) { -// strcpy(pConnInfo->user, pConn->pUser->user); -// pConnInfo->ip = pConn->ip; -// pConnInfo->port = pConn->port; -// pConnInfo->stime = pConn->stime; -// -// pConnShow->numOfConns++; -// pConnInfo++; -// pConn = pConn->next; -// } -// } -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// // sorting based on useconds -// -// pShow->pNode = pConnShow; - - return 0; -} - -int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { - int cols = 0; - - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; - SSchema *pSchema = tsGetSchema(pMeta); - - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "user"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "ip:port"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "login time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htons(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - - pShow->numOfRows = 1000000; - pShow->pNode = NULL; - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - - mgmtGetConns(pShow, pConn); - return 0; -} - -int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn) { - int numOfRows = 0; - char *pWrite; - int cols = 0; - - SConnShow *pConnShow = (SConnShow *)pShow->pNode; - - if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index; - - while (numOfRows < rows) { - SConnInfo *pNode = pConnShow->connInfo + pConnShow->index; - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pNode->user); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - uint32_t ip = pNode->ip; - sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port)); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->stime; - cols++; - - numOfRows++; - pConnShow->index++; - } - - if (numOfRows == 0) { - tfree(pConnShow); - } - - pShow->numOfReads += numOfRows; - return numOfRows; -} - -bool mgmtCheckQhandle(uint64_t qhandle) { - return true; -} - -void mgmtSaveQhandle(void *qhandle) { -} - -void mgmtFreeQhandle(void *qhandle) { -} \ No newline at end of file diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 20043c3800..a28f337f69 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -33,6 +33,9 @@ void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL; int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL; int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL; void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL; +int32_t (*mgmtCreateDnodeFp)(uint32_t ip) = NULL; +int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip) = NULL; + static SDnodeObj tsDnodeObj = {0}; diff --git a/src/mnode/src/mgmtGrant.c b/src/mnode/src/mgmtGrant.c index c24fb82aa6..0ea212b86f 100644 --- a/src/mnode/src/mgmtGrant.c +++ b/src/mnode/src/mgmtGrant.c @@ -22,6 +22,7 @@ int32_t (*mgmtCheckUserGrantFp)() = NULL; int32_t (*mgmtCheckDbGrantFp)() = NULL; +int32_t (*mgmtCheckDnodeGrantFp)() = NULL; void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL; @@ -32,7 +33,7 @@ void (*mgmtUpdateGrantInfoFp)(void *pCont) = NULL; int32_t mgmtCheckUserGrant() { if (mgmtCheckUserGrantFp) { - return mgmtCheckUserGrantFp(); + return (*mgmtCheckUserGrantFp)(); } else { return 0; } @@ -40,7 +41,15 @@ int32_t mgmtCheckUserGrant() { int32_t mgmtCheckDbGrant() { if (mgmtCheckDbGrantFp) { - return mgmtCheckDbGrantFp(); + return (*mgmtCheckDbGrantFp)(); + } else { + return 0; + } +} + +int32_t mgmtCheckDnodeGrant() { + if (mgmtCheckDnodeGrantFp) { + return (*mgmtCheckDnodeGrantFp)(); } else { return 0; } @@ -49,20 +58,20 @@ int32_t mgmtCheckDbGrant() { void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { pAcct->acctInfo.numOfTimeSeries += timeSeriesNum; if (mgmtAddTimeSeriesFp) { - mgmtAddTimeSeriesFp(timeSeriesNum); + (*mgmtAddTimeSeriesFp)(timeSeriesNum); } } void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum; if (mgmtRestoreTimeSeriesFp) { - mgmtRestoreTimeSeriesFp(timeSeriesNum); + (*mgmtRestoreTimeSeriesFp)(timeSeriesNum); } } int32_t mgmtCheckTimeSeries(uint32_t timeseries) { if (mgmtCheckTimeSeriesFp) { - return mgmtCheckTimeSeriesFp(timeseries); + return (*mgmtCheckTimeSeriesFp)(timeseries); } else { return 0; } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 1c60312f3e..9cc796ec0f 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -18,12 +18,32 @@ #include "mgmtMnode.h" #include "mgmtUser.h" -void *(*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; int32_t (*mgmtInitMnodesFp)() = NULL; +void (*mgmtCleanUpMnodesFp)() = NULL; int32_t (*mgmtGetMnodesNumFp)() = NULL; +void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; -static int32_t mgmtGetMnodesNum(); -static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode); +static int32_t mgmtGetMnodesNum() { + if (mgmtGetMnodesNumFp) { + return (*mgmtGetMnodesNumFp)(); + } else { + return 1; + } +} + +static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { + if (mgmtGetNextMnodeFp) { + return (*mgmtGetNextMnodeFp)(pShow, pMnode); + } else { + if (*pMnode == NULL) { + *pMnode = NULL; + } else { + *pMnode = NULL; + } + } + + return *pMnode; +} int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; @@ -88,11 +108,8 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon char ipstr[20]; while (numOfRows < rows) { - pShow->pNode = mgmtGetNextMnode(pShow, (SDnodeObj **)&pMnode); - - -// pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode); -// if (pMnode == NULL) break; + pShow->pNode = mgmtGetNextMnode(pShow, (SSdbPeer **)&pMnode); + if (pMnode == NULL) break; cols = 0; @@ -123,25 +140,3 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon pShow->numOfReads += numOfRows; return numOfRows; } - -static int32_t mgmtGetMnodesNum() { - if (mgmtGetMnodesNumFp) { - return mgmtGetMnodesNumFp(); - } else { - return 1; - } -} - -static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { - if (mgmtGetNextMnodeFp) { - return mgmtGetNextMnodeFp(pShow, pMnode); - } else { - if (*pMnode == NULL) { - *pMnode = NULL; - } else { - *pMnode = NULL; - } - } - - return *pMnode; -} \ No newline at end of file diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index 6e9a6e9c07..d8cc5af06b 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -15,16 +15,27 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "mnode.h" -#include "mgmtProfile.h" #include "taosmsg.h" #include "tschemautil.h" +#include "mgmtProfile.h" + +typedef struct { + char user[TSDB_TABLE_ID_LEN + 1]; + uint64_t stime; + uint32_t ip; + uint16_t port; +} SConnInfo; + +typedef struct { + int numOfConns; + int index; + SConnInfo connInfo[]; +} SConnShow; typedef struct { uint32_t ip; uint16_t port; - char user[TSDB_TABLE_ID_LEN]; + char user[TSDB_TABLE_ID_LEN+ 1]; } SCDesc; typedef struct { @@ -532,3 +543,123 @@ int32_t mgmtKillConnection(char *qidstr, void *pConn) { return TSDB_CODE_INVALID_CONNECTION; } + +bool mgmtCheckQhandle(uint64_t qhandle) { + return true; +} + +void mgmtSaveQhandle(void *qhandle) { +} + +void mgmtFreeQhandle(void *qhandle) { +} + +int mgmtGetConns(SShowObj *pShow, void *pConn) { + // SAcctObj * pAcct = pConn->pAcct; + // SConnShow *pConnShow; + // + // pthread_mutex_lock(&pAcct->mutex); + // + // pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow)); + // pConnShow->index = 0; + // pConnShow->numOfConns = 0; + // + // if (pAcct->acctInfo.numOfConns > 0) { + // pConn = pAcct->pConn; + // SConnInfo *pConnInfo = pConnShow->connInfo; + // + // while (pConn && pConn->pUser) { + // strcpy(pConnInfo->user, pConn->pUser->user); + // pConnInfo->ip = pConn->ip; + // pConnInfo->port = pConn->port; + // pConnInfo->stime = pConn->stime; + // + // pConnShow->numOfConns++; + // pConnInfo++; + // pConn = pConn->next; + // } + // } + // + // pthread_mutex_unlock(&pAcct->mutex); + // + // // sorting based on useconds + // + // pShow->pNode = pConnShow; + + return 0; +} + +int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + int32_t cols = 0; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; + SSchema *pSchema = tsGetSchema(pMeta); + + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "user"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ip:port"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "login time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + + pShow->numOfRows = 1000000; + pShow->pNode = NULL; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + mgmtGetConns(pShow, pConn); + return 0; +} + +int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + int32_t numOfRows = 0; + char *pWrite; + int32_t cols = 0; + + SConnShow *pConnShow = (SConnShow *)pShow->pNode; + + if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index; + + while (numOfRows < rows) { + SConnInfo *pNode = pConnShow->connInfo + pConnShow->index; + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, pNode->user); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + uint32_t ip = pNode->ip; + sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port)); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pNode->stime; + cols++; + + numOfRows++; + pConnShow->index++; + } + + if (numOfRows == 0) { + tfree(pConnShow); + } + + pShow->numOfReads += numOfRows; + return numOfRows; +} diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 2aadd8963c..5c3b540916 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -26,7 +26,6 @@ #include "mgmtAcct.h" #include "mgmtBalance.h" #include "mgmtChildTable.h" -#include "mgmtConn.h" #include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtDnodeInt.h" @@ -1164,10 +1163,8 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); } -void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) { if (!mgmtAlterAcctFp) { @@ -1297,6 +1294,76 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle rpcSendResponse(ahandle, code, NULL, 0); } +static void mgmtProcessCreateDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { + if (!mgmtCreateDnodeFp) { + rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + return; + } + + SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCont; + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("failed to create dnode:%s, redirect this message", pCreate->ip); + return; + } + + SUserObj *pUser = mgmtGetUserFromConn(ahandle); + if (pUser == NULL) { + mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_INVALID_USER)); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return; + } + + if (strcmp(pUser->user, "root") != 0) { + mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_NO_RIGHTS)); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; + } + + int32_t code = (*mgmtCreateDnodeFp)(inet_addr(pCreate->ip)); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("dnode:%s is created by %s", pCreate->ip, pUser->user); + } else { + mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(code)); + } + + rpcSendResponse(ahandle, code, NULL, 0); +} + +static void mgmtProcessDropDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { + if (!mgmtDropDnodeByIpFp) { + rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + return; + } + + SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCont; + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("failed to drop dnode:%s, redirect this message", pDrop->ip); + return; + } + + SUserObj *pUser = mgmtGetUserFromConn(ahandle); + if (pUser == NULL) { + mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_INVALID_USER)); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return; + } + + if (strcmp(pUser->user, "root") != 0) { + mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_NO_RIGHTS)); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; + } + + int32_t code = (*mgmtDropDnodeByIpFp)(inet_addr(pDrop->ip)); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("dnode:%s set to removing state by %s", pDrop->ip, pUser->user); + } else { + mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(code)); + } + + rpcSendResponse(ahandle, code, NULL, 0); +} + void mgmtInitProcessShellMsg() { mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; From 0d4618ce1b9dbfd7a4d7cb1786983b2b3af06ffc Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 5 Mar 2020 10:12:10 +0800 Subject: [PATCH 2/5] fix compile errors in cluster module --- src/dnode/inc/dnodeMgmt.h | 4 +--- src/dnode/src/dnodeMgmt.c | 4 ++-- src/inc/dnode.h | 2 -- src/inc/mnode.h | 1 + src/mnode/inc/mgmtDnode.h | 2 ++ src/mnode/src/mgmtDnode.c | 14 ++++++++++++++ 6 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 399b4b9920..7a67d7dbf2 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -26,15 +26,13 @@ extern "C" { int32_t dnodeInitMgmt(); void dnodeInitMgmtIp(); -void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); +void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen); void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); void dnodeSendVnodeCfgMsg(int32_t vnode); void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid); - - #ifdef __cplusplus } #endif diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index d0cb0a39af..1e7af8d094 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -171,13 +171,13 @@ void dnodeCleanUpMgmt() { } } -void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { +void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { dError("invalid msg type:%d", msgType); return; } - dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[msgType], pConn); + dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn); if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) { dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 15a6096826..ff893acd38 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -49,8 +49,6 @@ extern int32_t (*dnodeCheckSystem)(); extern void *tsDnodeMgmtQhandle; void dnodeCheckDataDirOpenned(const char* dir); -void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); - // dnodeModule extern void (*dnodeStartModules)(); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 34c8b8c77e..12a3fefe66 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -264,6 +264,7 @@ void mgmtCleanUpSystem(); void mgmtStopSystem(); void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); +void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index aafe45be5c..193d4544a1 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -50,6 +50,8 @@ int32_t mgmtGetDnodesNum(); int32_t mgmtUpdateDnode(SDnodeObj *pDnode); void* mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode); bool mgmtCheckConfigShow(SGlobalConfig *cfg); +bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode); +bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode); void mgmtSetDnodeUnRemove(SDnodeObj *pDnode); SDnodeObj* mgmtGetDnode(uint32_t ip); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index a28f337f69..8fea669ea0 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -636,3 +636,17 @@ bool mgmtCheckConfigShow(SGlobalConfig *cfg) { return false; return true; } + +/** + * check if a dnode in remove state + **/ +bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode) { + return pDnode->lbStatus == TSDB_DN_LB_STATUS_OFFLINE_REMOVING || pDnode->lbStatus == TSDB_DN_LB_STATE_SHELL_REMOVING; +} + +/** + * check if a dnode in offline state + **/ +bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode) { + return pDnode->status == TSDB_DN_STATUS_OFFLINE; +} From a00be5e3e83452f3a572266bcc2462d9ba18775f Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 6 Mar 2020 17:06:02 +0800 Subject: [PATCH 3/5] refact code for cluster --- src/inc/sdb.h | 15 ------------- src/inc/taosmsg.h | 11 ---------- src/mnode/inc/mgmtBalance.h | 1 - src/mnode/inc/mgmtDnode.h | 2 ++ src/mnode/src/mgmtDb.c | 4 ++-- src/mnode/src/mgmtDnode.c | 3 +++ src/mnode/src/mgmtDnodeInt.c | 2 +- src/mnode/src/mgmtShell.c | 1 - src/mnode/src/mgmtSystem.c | 4 ---- src/mnode/src/mgmtVgroup.c | 21 ++++++++++++++++++ src/sdb/inc/sdbint.h | 41 +++++++++++++++--------------------- src/sdb/src/sdbEngine.c | 9 ++++---- src/sdb/src/sdbstr.c | 2 +- 13 files changed, 51 insertions(+), 65 deletions(-) diff --git a/src/inc/sdb.h b/src/inc/sdb.h index d0239522a9..4b4de1ac4b 100644 --- a/src/inc/sdb.h +++ b/src/inc/sdb.h @@ -25,18 +25,10 @@ extern "C" { extern uint16_t tsMgmtMgmtPort; extern uint16_t tsMgmtSyncPort; -extern int sdbMaxNodes; extern int tsMgmtPeerHBTimer; // seconds -extern char sdbZone[]; -extern char sdbMasterIp[]; -extern char sdbPrivateIp[]; extern char * sdbStatusStr[]; extern char * sdbRoleStr[]; -extern void * mnodeSdb; -extern int sdbExtConns; extern int sdbMaster; -extern uint32_t sdbPublicIp; -extern uint32_t sdbMasterStartTime; extern SRpcIpSet *pSdbIpList; extern SRpcIpSet *pSdbPublicIpList; @@ -89,14 +81,9 @@ typedef struct { // internal int syncFd; void *hbTimer; - void *thandle; void *pSync; } SSdbPeer; -SSdbPeer *sdbAddPeer(uint32_t ip, uint32_t publicIp, char role); - -void sdbUpdateIpList(); - extern SSdbPeer *sdbPeer[]; #define sdbInited (sdbPeer[0]) #define sdbStatus (sdbPeer[0]->status) @@ -130,8 +117,6 @@ int sdbInitPeers(char *directory); void sdbCleanUpPeers(); -int sdbCfgNode(char *cont); - int64_t sdbGetVersion(); int32_t sdbGetRunStatus(); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 7914c6cc86..bf03187010 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -600,17 +600,6 @@ typedef struct { SVnodeAccess vnodeAccess[]; } SStatusRsp; -// internal message -typedef struct { - uint32_t destId; - uint32_t destIp; - char tableId[TSDB_UNI_LEN + 1]; - char empty[3]; - uint8_t msgType; - int32_t msgLen; - uint8_t content[0]; -} SIntMsg; - typedef struct { char spi; char encrypt; diff --git a/src/mnode/inc/mgmtBalance.h b/src/mnode/inc/mgmtBalance.h index 61331b9b3d..7a6bb3a9aa 100644 --- a/src/mnode/inc/mgmtBalance.h +++ b/src/mnode/inc/mgmtBalance.h @@ -22,7 +22,6 @@ extern "C" { #include "mnode.h" -void mgmtStartBalanceTimer(int64_t mseconds); int32_t mgmtInitBalance(); void mgmtCleanupBalance(); int32_t mgmtAllocVnodes(SVgObj *pVgroup); diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 193d4544a1..6532c98612 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -58,6 +58,8 @@ SDnodeObj* mgmtGetDnode(uint32_t ip); extern int32_t (*mgmtCreateDnodeFp)(uint32_t ip); extern int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip); +void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode); + #ifdef __cplusplus } #endif diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index bedd51dcff..0186f51e16 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -455,9 +455,9 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { // // SVgObj *pVgroup = pDb->pHead; // while (pVgroup != NULL) { -// mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); +// balanceUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); // if (oldReplicaNum < pDb->cfg.replications) { -// if (!mgmtAddVnode(pVgroup, NULL, NULL)) { +// if (!balanceAddVnode(pVgroup, NULL, NULL)) { // mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); // code = TSDB_CODE_NO_ENOUGH_DNODES; // } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 8fea669ea0..a9c784eca9 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -609,6 +609,9 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) { int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { if (mgmtGetScoresMetaFp) { + SUserObj *pUser = mgmtGetUserFromConn(pConn); + if (pUser == NULL) return 0; + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; return mgmtGetScoresMetaFp(pMeta, pShow, pConn); } else { return TSDB_CODE_OPS_NOT_SUPPORT; diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 5a365f220a..100e76b10b 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -312,7 +312,7 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score); pDnode->customScore = score; mgmtUpdateDnode(pDnode); - mgmtStartBalanceTimer(15); + //mgmtStartBalanceTimer(15); } return TSDB_CODE_INVALID_SQL; } else if (strncasecmp(option, "bandwidth", 9) == 0) { diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 5c3b540916..a889c16f75 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -51,7 +51,6 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); -uint32_t mgmtAccessSquence = 0; void *tsShellConnServer = NULL; void mgmtProcessTranRequest(SSchedMsg *sched) { diff --git a/src/mnode/src/mgmtSystem.c b/src/mnode/src/mgmtSystem.c index e36788a5dd..bcb5f64a7b 100644 --- a/src/mnode/src/mgmtSystem.c +++ b/src/mnode/src/mgmtSystem.c @@ -61,10 +61,6 @@ int32_t mgmtCheckMgmtRunning() { tsetModuleStatus(TSDB_MOD_MGMT); -// strcpy(sdbMasterIp, mgmtIpStr[0]); -// strcpy(sdbPrivateIp, tsPrivateIp); -// sdbPublicIp = inet_addr(tsPublicIp); - return 0; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 10b4244bf8..b0ff80a819 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -318,6 +318,27 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { return 0; } +char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { + SDnodeObj *pDnode = mgmtGetDnode(pVnode->ip); + if (pDnode == NULL) { + mError("dnode:%s, vgroup:%d, vnode:%d dnode not exist", taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode); + return "null"; + } + + if (pDnode->status == TSDB_DN_STATUS_OFFLINE) { + return "offline"; + } + + SVnodeLoad *vload = pDnode->vload + pVnode->vnode; + if (vload->vgId != pVgroup->vgId || vload->vnode != pVnode->vnode) { + mError("dnode:%s, vgroup:%d, vnode:%d not same with dnode vgroup:%d vnode:%d", + taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode, vload->vgId, vload->vnode); + return "null"; + } + + return (char*)taosGetVnodeStatusStr(vload->status); +} + int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; SVgObj *pVgroup = NULL; diff --git a/src/sdb/inc/sdbint.h b/src/sdb/inc/sdbint.h index d0977f2e2f..8bb28b100e 100644 --- a/src/sdb/inc/sdbint.h +++ b/src/sdb/inc/sdbint.h @@ -48,6 +48,21 @@ #define sdbPrint(...) \ { tprintf("MND-SDB ", 255, __VA_ARGS__); } +#define mpeerError(...) \ + if (sdbDebugFlag & DEBUG_ERROR) { \ + tprintf("ERROR MND-MPEER ", 255, __VA_ARGS__); \ + } +#define mpeerWarn(...) \ + if (sdbDebugFlag & DEBUG_WARN) { \ + tprintf("WARN MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \ + } +#define mpeerTrace(...) \ + if (sdbDebugFlag & DEBUG_TRACE) { \ + tprintf("MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \ + } +#define mpeerPrint(...) \ + { tprintf("MND-MPEER ", 255, __VA_ARGS__); } + #define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__) #define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__) #define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__) @@ -69,11 +84,6 @@ typedef struct { char *row; } SSdbUpdate; -typedef struct { - char numOfTables; - uint64_t version[]; -} SSdbSync; - typedef struct { SSdbHeader header; int maxRows; @@ -109,23 +119,6 @@ typedef struct { char data[]; } SRowHead; -typedef struct { - char * buffer; - char * offset; - int trans; - int bufferSize; - pthread_mutex_t qmutex; -} STranQueue; - -typedef struct { - char status; - char role; - char numOfMnodes; - uint64_t dbVersion; - uint32_t numOfDnodes; - uint32_t publicIp; -} SMnodeStatus; - typedef struct { uint8_t dbId; char type; @@ -139,8 +132,8 @@ extern int sdbMaxPeers; extern int sdbNumOfTables; extern int64_t sdbVersion; -int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen); -int sdbRetrieveRows(int fd, SSdbTable *pTable, uint64_t version); +int mpeerForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen); +int mpeerRetrieveRows(int fd, SSdbTable *pTable, uint64_t version); void sdbResetTable(SSdbTable *pTable); extern const int16_t sdbFileVersion; diff --git a/src/sdb/src/sdbEngine.c b/src/sdb/src/sdbEngine.c index 4b000a30eb..0196ff084f 100644 --- a/src/sdb/src/sdbEngine.c +++ b/src/sdb/src/sdbEngine.c @@ -23,7 +23,6 @@ extern char version[]; const int16_t sdbFileVersion = 0; -int sdbExtConns = 0; SRpcIpSet *pSdbIpList = NULL; SRpcIpSet *pSdbPublicIpList = NULL; SSdbPeer * sdbPeer[SDB_MAX_PEERS]; // first slot for self @@ -431,7 +430,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) == 0) { + if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) == 0) { pTable->id++; sdbVersion++; if (pTable->keyType == SDB_KEYTYPE_AUTO) { @@ -548,7 +547,7 @@ int sdbDeleteRow(void *handle, void *row) { pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) { + if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) { pTable->id++; sdbVersion++; @@ -666,7 +665,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) { + if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) { pTable->id++; sdbVersion++; @@ -745,7 +744,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) { } pthread_mutex_lock(&pTable->mutex); - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_BATCH_UPDATE, row, rowSize) == 0) { + if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_BATCH_UPDATE, row, rowSize) == 0) { /* // write action */ /* write(pTable->fd, &action, sizeof(action)); */ /* pTable->size += sizeof(action); */ diff --git a/src/sdb/src/sdbstr.c b/src/sdb/src/sdbstr.c index 90bf2a3d43..6df779dff7 100644 --- a/src/sdb/src/sdbstr.c +++ b/src/sdb/src/sdbstr.c @@ -24,7 +24,7 @@ char* sdbRoleStr[] = {"unauthed", "undecided", "master", "slave", "null"}; /* * Lite Version sync request is always successful */ -int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen) { +int mpeerForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen) { return 0; } From f8d35c2a4b7a516d62e3f980c9287e8ab9d62b4a Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 6 Mar 2020 22:04:18 +0800 Subject: [PATCH 4/5] fix compile error --- src/dnode/src/dnodeModule.c | 32 ------------------ src/dnode/src/dnodeSystem.c | 24 +++++++++----- src/mnode/CMakeLists.txt | 2 +- src/mnode/inc/mgmtMnode.h | 4 +-- src/mnode/src/mgmtBalance.c | 8 ----- src/mnode/src/mgmtMnode.c | 20 +++++++++-- src/plugins/http/CMakeLists.txt | 2 +- src/plugins/http/src/httpSystem.c | 22 +++++++++---- src/sdb/CMakeLists.txt | 2 +- src/sdb/inc/sdbint.h | 4 +-- src/sdb/src/sdbEngine.c | 8 ++--- src/sdb/src/sdbstr.c | 55 ++++++++++++++++++++----------- 12 files changed, 94 insertions(+), 89 deletions(-) diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 96d0db5f6a..dd4678802f 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -68,38 +68,6 @@ void dnodeCleanUpModules() { } } -void dnodeProcessModuleStatus(uint32_t status) { - if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { - return; - } - - int news = status; - int olds = tsModuleStatus; - - for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { - int newStatus = news & (1 << moduleType); - int oldStatus = olds & (1 << moduleType); - - if (oldStatus > 0) { - if (newStatus == 0) { - if (tsModule[moduleType].stopFp) { - dPrint("module:%s is stopped on this node", tsModule[moduleType].name); - (*tsModule[moduleType].stopFp)(); - } - } - } else if (oldStatus == 0) { - if (newStatus > 0) { - if (tsModule[moduleType].startFp) { - dPrint("module:%s is started on this node", tsModule[moduleType].name); - (*tsModule[moduleType].startFp)(); - } - } - } else { - } - } - tsModuleStatus = status; -} - int32_t dnodeInitModules() { for (int mod = 0; mod < TSDB_MOD_MAX; ++mod) { if (tsModule[mod].num != 0 && tsModule[mod].initFp) { diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 67a1d42565..d6127574c6 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -33,12 +33,14 @@ #include "dnodeVnodeMgmt.h" #ifdef CLUSTER -//#include "acct.h" -//#include "admin.h" -//#include "cluster.h" -//#include "grant.h" -//#include "replica.h" -//#include "storage.h" +#include "account.h" +#include "admin.h" +#include "balance.h" +#include "cluster.h" +#include "grant.h" +#include "mpeer.h" +#include "storage.h" +#include "vpeer.h" #endif static pthread_mutex_t tsDnodeMutex; @@ -89,8 +91,6 @@ void dnodeCleanUpSystem() { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); } - - dnodeCleanupShell(); dnodeCleanUpModules(); dnodeCleanupVnodes(); @@ -112,7 +112,13 @@ void dnodeCheckDataDirOpenned(const char *dir) { void dnodeInitPlugins() { #ifdef CLUSTER - acctInit(); +// acctInit(); +// adminInit(); +// balanceInit(); +// clusterInit(); +// grantInit(); +// mpeerInit(); +// storageInit(); #endif } diff --git a/src/mnode/CMakeLists.txt b/src/mnode/CMakeLists.txt index 6bf4ef34e0..acf8b15f22 100644 --- a/src/mnode/CMakeLists.txt +++ b/src/mnode/CMakeLists.txt @@ -14,7 +14,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread) IF (TD_CLUSTER) - TARGET_LINK_LIBRARIES(mnode acct) + TARGET_LINK_LIBRARIES(mnode) ENDIF () ENDIF () diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index 9132993994..6e8e91ebc8 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -24,8 +24,8 @@ extern "C" { #include #include "mnode.h" -int32_t mgmtInitMnodes(); -void mgmtCleanUpMnodes(); +int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp); +int32_t mgmtRemoveMnode(uint32_t privateIp); int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index 1e5fc54c5a..8bd9cf8933 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -77,11 +77,3 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) { return 0; } } - -char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { - if (mgmtGetVnodeStatusFp) { - return (*mgmtGetVnodeStatusFp)(pVgroup, pVnode); - } else { - return "master"; - } -} diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 9cc796ec0f..b3b70d1d2e 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -18,11 +18,27 @@ #include "mgmtMnode.h" #include "mgmtUser.h" -int32_t (*mgmtInitMnodesFp)() = NULL; -void (*mgmtCleanUpMnodesFp)() = NULL; +int32_t (*mgmtAddMnodeFp)(uint32_t privateIp, uint32_t publicIp) = NULL; +int32_t (*mgmtRemoveMnodeFp)(uint32_t privateIp) = NULL; int32_t (*mgmtGetMnodesNumFp)() = NULL; void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; +int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp) { + if (mgmtAddMnodeFp) { + return (*mgmtAddMnodeFp)(privateIp, publicIp); + } else { + return 0; + } +} + +int32_t mgmtRemoveMnode(uint32_t privateIp) { + if (mgmtRemoveMnodeFp) { + return (*mgmtRemoveMnodeFp)(privateIp); + } else { + return 0; + } +} + static int32_t mgmtGetMnodesNum() { if (mgmtGetMnodesNumFp) { return (*mgmtGetMnodesNumFp)(); diff --git a/src/plugins/http/CMakeLists.txt b/src/plugins/http/CMakeLists.txt index 9fd2999957..7044f5d09d 100644 --- a/src/plugins/http/CMakeLists.txt +++ b/src/plugins/http/CMakeLists.txt @@ -13,6 +13,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(http taos_static z) IF (TD_CLUSTER) - TARGET_LINK_LIBRARIES(http http_admin) + TARGET_LINK_LIBRARIES(http) ENDIF () ENDIF () diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 41b712048f..aa66af9825 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -34,13 +34,21 @@ #include "tgHandle.h" #include "tlog.h" -#ifdef CLUSTER - void adminInitHandle(HttpServer* pServer); - void opInitHandle(HttpServer* pServer); -#else - void adminInitHandle(HttpServer* pServer) {} - void opInitHandle(HttpServer* pServer) {} -#endif + +void (*adminInitHandleFp)(HttpServer* pServer) = NULL; +void (*opInitHandleFp)(HttpServer* pServer) = NULL; + +void adminInitHandle(HttpServer* pServer) { + if (adminInitHandleFp) { + (*adminInitHandleFp)(pServer); + } +} + +void opInitHandle(HttpServer* pServer) { + if (opInitHandleFp) { + (*opInitHandleFp)(pServer); + } +} static HttpServer *httpServer = NULL; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); diff --git a/src/sdb/CMakeLists.txt b/src/sdb/CMakeLists.txt index b0617353d9..47ea6e15b8 100644 --- a/src/sdb/CMakeLists.txt +++ b/src/sdb/CMakeLists.txt @@ -11,6 +11,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ADD_LIBRARY(sdb ${SRC}) TARGET_LINK_LIBRARIES(sdb trpc) IF (TD_CLUSTER) - TARGET_LINK_LIBRARIES(sdb mreplica) + TARGET_LINK_LIBRARIES(sdb) ENDIF() ENDIF () diff --git a/src/sdb/inc/sdbint.h b/src/sdb/inc/sdbint.h index 8bb28b100e..30ebe0909c 100644 --- a/src/sdb/inc/sdbint.h +++ b/src/sdb/inc/sdbint.h @@ -84,7 +84,7 @@ typedef struct { char *row; } SSdbUpdate; -typedef struct { +typedef struct _SSdbTable { SSdbHeader header; int maxRows; int dbId; @@ -132,7 +132,7 @@ extern int sdbMaxPeers; extern int sdbNumOfTables; extern int64_t sdbVersion; -int mpeerForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen); +int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen); int mpeerRetrieveRows(int fd, SSdbTable *pTable, uint64_t version); void sdbResetTable(SSdbTable *pTable); extern const int16_t sdbFileVersion; diff --git a/src/sdb/src/sdbEngine.c b/src/sdb/src/sdbEngine.c index 0196ff084f..fbc41089d1 100644 --- a/src/sdb/src/sdbEngine.c +++ b/src/sdb/src/sdbEngine.c @@ -430,7 +430,7 @@ int64_t sdbInsertRow(void *handle, void *row, int rowSize) { pthread_mutex_lock(&pTable->mutex); - if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) == 0) { + if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) == 0) { pTable->id++; sdbVersion++; if (pTable->keyType == SDB_KEYTYPE_AUTO) { @@ -547,7 +547,7 @@ int sdbDeleteRow(void *handle, void *row) { pthread_mutex_lock(&pTable->mutex); - if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) { + if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) { pTable->id++; sdbVersion++; @@ -665,7 +665,7 @@ int sdbUpdateRow(void *handle, void *row, int updateSize, char isUpdated) { pthread_mutex_lock(&pTable->mutex); - if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) { + if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) { pTable->id++; sdbVersion++; @@ -744,7 +744,7 @@ int sdbBatchUpdateRow(void *handle, void *row, int rowSize) { } pthread_mutex_lock(&pTable->mutex); - if (mpeerForwardDbReqToPeer(pTable, SDB_TYPE_BATCH_UPDATE, row, rowSize) == 0) { + if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_BATCH_UPDATE, row, rowSize) == 0) { /* // write action */ /* write(pTable->fd, &action, sizeof(action)); */ /* pTable->size += sizeof(action); */ diff --git a/src/sdb/src/sdbstr.c b/src/sdb/src/sdbstr.c index 6df779dff7..59c01eb15a 100644 --- a/src/sdb/src/sdbstr.c +++ b/src/sdb/src/sdbstr.c @@ -12,32 +12,47 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - +#define _DEFAULT_SOURCE #include "sdbint.h" -char* sdbStatusStr[] = {"offline", "unsynced", "syncing", "serving", "null"}; +int32_t (*mpeerInitMnodesFp)(char *directory) = NULL; +void (*mpeerCleanUpMnodesFp)() = NULL; +int32_t (*mpeerForwardRequestFp)(SSdbTable *pTable, char type, void *cont, int32_t contLen) = NULL; -char* sdbRoleStr[] = {"unauthed", "undecided", "master", "slave", "null"}; +char *sdbStatusStr[] = { + "offline", + "unsynced", + "syncing", + "serving", + "null" +}; -#ifndef CLUSTER +char *sdbRoleStr[] = { + "unauthed", + "undecided", + "master", + "slave", + "null" +}; -/* - * Lite Version sync request is always successful - */ -int mpeerForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen) { - return 0; +int32_t sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int32_t dataLen) { + if (mpeerForwardRequestFp) { + return mpeerForwardRequestFp(pTable, type, data, dataLen); + } else { + return 0; + } } -/* - * Lite Version does not need to initialize peers - */ -int sdbInitPeers(char *directory) { - return 0; +int32_t sdbInitPeers(char *directory) { + if (mpeerInitMnodesFp) { + return (*mpeerInitMnodesFp)(directory); + } else { + return 0; + } } -/* - * Lite Version does not need to cleanup peers - */ -void sdbCleanUpPeers(){} - -#endif \ No newline at end of file +void sdbCleanUpPeers() { + if (mpeerCleanUpMnodesFp) { + (*mpeerCleanUpMnodesFp)(); + } +} From 3fec909265cf907375ded2e60603fc23d9c0c726 Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 6 Mar 2020 22:19:27 +0800 Subject: [PATCH 5/5] change cmake files --- src/CMakeLists.txt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 91670d3e7a..34768b00df 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,11 +4,11 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(os) ADD_SUBDIRECTORY(util) ADD_SUBDIRECTORY(rpc) -# ADD_SUBDIRECTORY(client) -# ADD_SUBDIRECTORY(kit) -# ADD_SUBDIRECTORY(plugins) -# ADD_SUBDIRECTORY(sdb) -# ADD_SUBDIRECTORY(mnode) +ADD_SUBDIRECTORY(client) +ADD_SUBDIRECTORY(kit) +ADD_SUBDIRECTORY(plugins) +ADD_SUBDIRECTORY(sdb) +ADD_SUBDIRECTORY(mnode) ADD_SUBDIRECTORY(vnode) -# ADD_SUBDIRECTORY(dnode) +ADD_SUBDIRECTORY(dnode) #ADD_SUBDIRECTORY(connector/jdbc)