refactor mgmtShell and mgmt
This commit is contained in:
parent
0fcdc41948
commit
fcde340200
|
@ -1802,7 +1802,7 @@ int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pCmd->msgType = TSDB_MSG_TYPE_DNODE_CFG;
|
||||
pCmd->msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
|||
COMMAND echo "make test directory"
|
||||
DEPENDS taosd
|
||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
|
||||
COMMAND ${CMAKE_COMMAND} -E make_directoryF ${TD_TESTS_OUTPUT_DIR}/log/
|
||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
|
||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
|
||||
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||
|
|
|
@ -52,6 +52,7 @@ int32_t dnodeInitMClient() {
|
|||
void dnodeCleanupMClient() {
|
||||
if (tsDnodeMClientRpc) {
|
||||
rpcClose(tsDnodeMClientRpc);
|
||||
tsDnodeMClientRpc = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
#include "tlog.h"
|
||||
#include "trpc.h"
|
||||
#include "tstatus.h"
|
||||
#include "tsdb.h"
|
||||
//#include "tsdb.h"
|
||||
#include "dnodeMgmt.h"
|
||||
#include "dnodeRead.h"
|
||||
#include "dnodeWrite.h"
|
||||
|
@ -44,19 +44,19 @@ static int32_t dnodeOpenVnodes();
|
|||
static void dnodeCleanupVnodes();
|
||||
static int32_t dnodeOpenVnode(int32_t vgId);
|
||||
static void dnodeCleanupVnode(SVnodeObj *pVnode);
|
||||
static int32_t dnodeCreateVnode(SCreateVnodeMsg *cfg);
|
||||
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg);
|
||||
static void dnodeDropVnode(SVnodeObj *pVnode);
|
||||
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
|
||||
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
|
||||
static void dnodeProcesSMDCreateVnodeMsg(SRpcMsg *pMsg);
|
||||
static void dnodeProcesSMDDropVnodeMsg(SRpcMsg *pMsg);
|
||||
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
|
||||
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
|
||||
|
||||
static void * tsDnodeVnodesHash = NULL;
|
||||
|
||||
int32_t dnodeInitMgmt() {
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_VNODE] = dnodeProcessDropVnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcesSMDCreateVnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcesSMDDropVnodeMsg;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
|
||||
|
||||
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
|
||||
if (tsDnodeVnodesHash == NULL) {
|
||||
|
@ -139,117 +139,117 @@ static void dnodeCleanupVnodes() {
|
|||
}
|
||||
|
||||
static int32_t dnodeOpenVnode(int32_t vgId) {
|
||||
char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||
sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId);
|
||||
|
||||
void *pTsdb = tsdbOpenRepo(rootDir);
|
||||
if (pTsdb != NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SVnodeObj vnodeObj;
|
||||
vnodeObj.vgId = vgId;
|
||||
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
|
||||
vnodeObj.refCount = 1;
|
||||
vnodeObj.version = 0;
|
||||
vnodeObj.wworker = dnodeAllocateWriteWorker();
|
||||
vnodeObj.rworker = dnodeAllocateReadWorker();
|
||||
vnodeObj.wal = NULL;
|
||||
vnodeObj.tsdb = pTsdb;
|
||||
vnodeObj.replica = NULL;
|
||||
vnodeObj.events = NULL;
|
||||
vnodeObj.cq = NULL;
|
||||
|
||||
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj);
|
||||
// char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||
// sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId);
|
||||
//
|
||||
// void *pTsdb = tsdbOpenRepo(rootDir);
|
||||
// if (pTsdb != NULL) {
|
||||
// return terrno;
|
||||
// }
|
||||
//
|
||||
// SVnodeObj vnodeObj;
|
||||
// vnodeObj.vgId = vgId;
|
||||
// vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
|
||||
// vnodeObj.refCount = 1;
|
||||
// vnodeObj.version = 0;
|
||||
// vnodeObj.wworker = dnodeAllocateWriteWorker();
|
||||
// vnodeObj.rworker = dnodeAllocateReadWorker();
|
||||
// vnodeObj.wal = NULL;
|
||||
// vnodeObj.tsdb = pTsdb;
|
||||
// vnodeObj.replica = NULL;
|
||||
// vnodeObj.events = NULL;
|
||||
// vnodeObj.cq = NULL;
|
||||
//
|
||||
// taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void dnodeCleanupVnode(SVnodeObj *pVnode) {
|
||||
pVnode->status = TSDB_VN_STATUS_NOT_READY;
|
||||
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||
if (count > 0) {
|
||||
// wait refcount
|
||||
}
|
||||
|
||||
// remove replica
|
||||
|
||||
// remove read queue
|
||||
dnodeFreeReadWorker(pVnode->rworker);
|
||||
pVnode->rworker = NULL;
|
||||
|
||||
// remove write queue
|
||||
dnodeFreeWriteWorker(pVnode->wworker);
|
||||
pVnode->wworker = NULL;
|
||||
|
||||
// remove wal
|
||||
|
||||
// remove tsdb
|
||||
if (pVnode->tsdb) {
|
||||
tsdbCloseRepo(pVnode->tsdb);
|
||||
pVnode->tsdb = NULL;
|
||||
}
|
||||
|
||||
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||
// pVnode->status = TSDB_VN_STATUS_NOT_READY;
|
||||
// int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||
// if (count > 0) {
|
||||
// // wait refcount
|
||||
// }
|
||||
//
|
||||
// // remove replica
|
||||
//
|
||||
// // remove read queue
|
||||
// dnodeFreeReadWorker(pVnode->rworker);
|
||||
// pVnode->rworker = NULL;
|
||||
//
|
||||
// // remove write queue
|
||||
// dnodeFreeWriteWorker(pVnode->wworker);
|
||||
// pVnode->wworker = NULL;
|
||||
//
|
||||
// // remove wal
|
||||
//
|
||||
// // remove tsdb
|
||||
// if (pVnode->tsdb) {
|
||||
// tsdbCloseRepo(pVnode->tsdb);
|
||||
// pVnode->tsdb = NULL;
|
||||
// }
|
||||
//
|
||||
// taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||
}
|
||||
|
||||
static int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnodeCfg) {
|
||||
STsdbCfg tsdbCfg;
|
||||
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
||||
tsdbCfg.tsdbId = pVnodeCfg->vnode;
|
||||
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
|
||||
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
|
||||
tsdbCfg.minRowsPerFileBlock = -1;
|
||||
tsdbCfg.maxRowsPerFileBlock = -1;
|
||||
tsdbCfg.keep = -1;
|
||||
tsdbCfg.maxCacheSize = -1;
|
||||
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||
// STsdbCfg tsdbCfg;
|
||||
// tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
||||
// tsdbCfg.tsdbId = pVnodeCfg->vnode;
|
||||
// tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
|
||||
// tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
|
||||
// tsdbCfg.minRowsPerFileBlock = -1;
|
||||
// tsdbCfg.maxRowsPerFileBlock = -1;
|
||||
// tsdbCfg.keep = -1;
|
||||
// tsdbCfg.maxCacheSize = -1;
|
||||
|
||||
char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||
sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId);
|
||||
|
||||
void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
|
||||
if (pTsdb != NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SVnodeObj vnodeObj;
|
||||
vnodeObj.vgId = pVnodeCfg->cfg.vgId;
|
||||
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
|
||||
vnodeObj.refCount = 1;
|
||||
vnodeObj.version = 0;
|
||||
vnodeObj.wworker = dnodeAllocateWriteWorker();
|
||||
vnodeObj.rworker = dnodeAllocateReadWorker();
|
||||
vnodeObj.wal = NULL;
|
||||
vnodeObj.tsdb = pTsdb;
|
||||
vnodeObj.replica = NULL;
|
||||
vnodeObj.events = NULL;
|
||||
vnodeObj.cq = NULL;
|
||||
|
||||
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj);
|
||||
// char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||
// sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId);
|
||||
//
|
||||
// void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
|
||||
// if (pTsdb != NULL) {
|
||||
// return terrno;
|
||||
// }
|
||||
//
|
||||
// SVnodeObj vnodeObj;
|
||||
// vnodeObj.vgId = pVnodeCfg->cfg.vgId;
|
||||
// vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
|
||||
// vnodeObj.refCount = 1;
|
||||
// vnodeObj.version = 0;
|
||||
// vnodeObj.wworker = dnodeAllocateWriteWorker();
|
||||
// vnodeObj.rworker = dnodeAllocateReadWorker();
|
||||
// vnodeObj.wal = NULL;
|
||||
// vnodeObj.tsdb = pTsdb;
|
||||
// vnodeObj.replica = NULL;
|
||||
// vnodeObj.events = NULL;
|
||||
// vnodeObj.cq = NULL;
|
||||
//
|
||||
// taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void dnodeDropVnode(SVnodeObj *pVnode) {
|
||||
pVnode->status = TSDB_VN_STATUS_NOT_READY;
|
||||
|
||||
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||
if (count > 0) {
|
||||
// wait refcount
|
||||
}
|
||||
|
||||
if (pVnode->tsdb) {
|
||||
tsdbDropRepo(pVnode->tsdb);
|
||||
pVnode->tsdb = NULL;
|
||||
}
|
||||
|
||||
dnodeCleanupVnode(pVnode);
|
||||
// pVnode->status = TSDB_VN_STATUS_NOT_READY;
|
||||
//
|
||||
// int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||
// if (count > 0) {
|
||||
// // wait refcount
|
||||
// }
|
||||
//
|
||||
// if (pVnode->tsdb) {
|
||||
// tsdbDropRepo(pVnode->tsdb);
|
||||
// pVnode->tsdb = NULL;
|
||||
// }
|
||||
//
|
||||
// dnodeCleanupVnode(pVnode);
|
||||
}
|
||||
|
||||
static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
||||
static void dnodeProcesSMDCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
||||
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
|
||||
SCreateVnodeMsg *pCreate = (SCreateVnodeMsg *) rpcMsg->pCont;
|
||||
SMDCreateVnodeMsg *pCreate = (SMDCreateVnodeMsg *) rpcMsg->pCont;
|
||||
pCreate->vnode = htonl(pCreate->vnode);
|
||||
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
||||
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
||||
|
@ -266,10 +266,10 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
|||
rpcFreeCont(rpcMsg->pCont);
|
||||
}
|
||||
|
||||
static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
|
||||
static void dnodeProcesSMDDropVnodeMsg(SRpcMsg *rpcMsg) {
|
||||
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
|
||||
SDropVnodeMsg *pDrop = (SCreateVnodeMsg *) rpcMsg->pCont;
|
||||
SMDDropVnodeMsg *pDrop = (SMDCreateVnodeMsg *) rpcMsg->pCont;
|
||||
pDrop->vgId = htonl(pDrop->vgId);
|
||||
|
||||
SVnodeObj *pVnodeObj = taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId);
|
||||
|
@ -287,7 +287,7 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
|
|||
static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
|
||||
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
|
||||
SCreateVnodeMsg *pCreate = (SCreateVnodeMsg *) rpcMsg->pCont;
|
||||
SMDCreateVnodeMsg *pCreate = (SMDCreateVnodeMsg *) rpcMsg->pCont;
|
||||
pCreate->vnode = htonl(pCreate->vnode);
|
||||
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
||||
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
||||
|
|
|
@ -26,9 +26,9 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg);
|
|||
static void *tsDnodeMnodeRpc = NULL;
|
||||
|
||||
int32_t dnodeInitMnode() {
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeWrite;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeWrite;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_VNODE] = dnodeMgmt;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeWrite;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeWrite;
|
||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeMgmt;
|
||||
|
||||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
|
|
|
@ -61,8 +61,8 @@ SWriteWorkerPool wWorkerPool;
|
|||
|
||||
int32_t dnodeInitWrite() {
|
||||
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg;
|
||||
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableMsg;
|
||||
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessDropTableMsg;
|
||||
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeProcessCreateTableMsg;
|
||||
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeProcessDropTableMsg;
|
||||
|
||||
wWorkerPool.max = tsNumOfCores;
|
||||
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
|
||||
|
|
|
@ -263,8 +263,6 @@ int32_t mgmtStartSystem();
|
|||
void mgmtCleanUpSystem();
|
||||
void mgmtStopSystem();
|
||||
|
||||
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
|
||||
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -37,31 +37,36 @@ extern "C" {
|
|||
#define TSDB_MSG_TYPE_QUERY_RSP 6
|
||||
#define TSDB_MSG_TYPE_RETRIEVE 7
|
||||
#define TSDB_MSG_TYPE_RETRIEVE_RSP 8
|
||||
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9
|
||||
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10
|
||||
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11
|
||||
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12
|
||||
|
||||
// dnodeMgmt
|
||||
#define TSDB_MSG_TYPE_CREATE_VNODE 13
|
||||
#define TSDB_MSG_TYPE_CREATE_VNODE_RSP 14
|
||||
#define TSDB_MSG_TYPE_DROP_VNODE 15
|
||||
#define TSDB_MSG_TYPE_DROP_VNODE_RSP 16
|
||||
#define TSDB_MSG_TYPE_ALTER_VNODE 17
|
||||
#define TSDB_MSG_TYPE_ALTER_VNODE_RSP 18
|
||||
#define TSDB_MSG_TYPE_CONFIG_VNODE 19
|
||||
#define TSDB_MSG_TYPE_CONFIG_VNODE_RSP 20
|
||||
// message from mgmt to dnode
|
||||
#define TSDB_MSG_TYPE_MD_CREATE_TABLE 9
|
||||
#define TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP 10
|
||||
#define TSDB_MSG_TYPE_MD_DROP_TABLE 11
|
||||
#define TSDB_MSG_TYPE_MD_DROP_TABLE_RSP 12
|
||||
#define TSDB_MSG_TYPE_MD_ALTER_TABLE 13
|
||||
#define TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP 14
|
||||
#define TSDB_MSG_TYPE_MD_CREATE_VNODE 15
|
||||
#define TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP 16
|
||||
#define TSDB_MSG_TYPE_MD_DROP_VNODE 17
|
||||
#define TSDB_MSG_TYPE_MD_DROP_VNODE_RSP 18
|
||||
#define TSDB_MSG_TYPE_MD_ALTER_VNODE 19
|
||||
#define TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP 20
|
||||
#define TSDB_MSG_TYPE_MD_DROP_STABLE 21
|
||||
#define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 22
|
||||
#define TSDB_MSG_TYPE_MD_ALTER_STREAM 23
|
||||
#define TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP 24
|
||||
#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25
|
||||
#define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 26
|
||||
|
||||
|
||||
#define TSDB_MSG_TYPE_DM_CONFIG_VNODE 19
|
||||
#define TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP 20
|
||||
|
||||
|
||||
#define TSDB_MSG_TYPE_DNODE_CFG 17
|
||||
#define TSDB_MSG_TYPE_DNODE_CFG_RSP 18
|
||||
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19
|
||||
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 20
|
||||
#define TSDB_MSG_TYPE_SDB_SYNC 21
|
||||
#define TSDB_MSG_TYPE_SDB_SYNC_RSP 22
|
||||
#define TSDB_MSG_TYPE_SDB_FORWARD 23
|
||||
#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24
|
||||
#define TSDB_MSG_TYPE_DROP_STABLE 25
|
||||
#define TSDB_MSG_TYPE_DROP_STABLE_RSP 26
|
||||
#define TSDB_MSG_TYPE_CONNECT 31
|
||||
#define TSDB_MSG_TYPE_CONNECT_RSP 32
|
||||
#define TSDB_MSG_TYPE_CREATE_ACCT 33
|
||||
|
@ -259,7 +264,7 @@ typedef struct {
|
|||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||
char superTableId[TSDB_TABLE_ID_LEN + 1];
|
||||
char data[];
|
||||
} SDCreateTableMsg;
|
||||
} SDMCreateTableMsg;
|
||||
|
||||
typedef struct {
|
||||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||
|
@ -341,7 +346,7 @@ typedef struct {
|
|||
uint64_t uid;
|
||||
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS];
|
||||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||
} SDRemoveTableMsg;
|
||||
} SMDDropTableMsg;
|
||||
|
||||
typedef struct {
|
||||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||
|
@ -350,7 +355,8 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
} SDropVnodeMsg;
|
||||
int32_t vnode;
|
||||
} SMDDropVnodeMsg;
|
||||
|
||||
typedef struct SColIndexEx {
|
||||
int16_t colId;
|
||||
|
@ -622,7 +628,7 @@ typedef struct {
|
|||
int32_t vnode;
|
||||
SVnodeCfg cfg;
|
||||
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS];
|
||||
} SCreateVnodeMsg;
|
||||
} SMDCreateVnodeMsg;
|
||||
|
||||
typedef struct {
|
||||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||
|
@ -751,7 +757,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
char ip[32];
|
||||
char config[64];
|
||||
} SCfgDnodeMsg;
|
||||
} SMDCfgDnodeMsg, SCMCfgDnodeMsg;
|
||||
|
||||
typedef struct {
|
||||
char sql[TSDB_SHOW_SQL_LEN + 1];
|
||||
|
|
|
@ -31,7 +31,7 @@ void mgmtCleanUpChildTables();
|
|||
void * mgmtGetChildTable(char *tableId);
|
||||
|
||||
int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
|
||||
SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
|
||||
SDMCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
|
||||
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
|
||||
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
|
||||
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_MGMT_DCLIENT_H
|
||||
#define TDENGINE_MGMT_DCLIENT_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t mgmtInitDClient();
|
||||
void mgmtCleanupDClient();
|
||||
void mgmtAddDClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_MGMT_DSERVER_H
|
||||
#define TDENGINE_MGMT_DSERVER_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t mgmtInitDServer();
|
||||
void mgmtCleanupDServer();
|
||||
void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
||||
|
||||
|
||||
//extern void *mgmtStatusTimer;
|
||||
//
|
||||
//void mgmtSendCreateTableMsg(SDMCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle);
|
||||
//void mgmtSendDropTableMsg(SMDDropTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle);
|
||||
//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
|
||||
//void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
|
||||
//void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
||||
//void mgmtSendDropVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
|
||||
//void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
||||
//
|
||||
//int32_t mgmtInitDnodeInt();
|
||||
//void mgmtCleanUpDnodeInt();
|
||||
//
|
||||
//void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle);
|
||||
//void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
|
||||
//void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -1,48 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_MGMT_DNODE_INT_H
|
||||
#define TDENGINE_MGMT_DNODE_INT_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include "mnode.h"
|
||||
|
||||
extern void *mgmtStatusTimer;
|
||||
|
||||
void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle);
|
||||
void mgmtSendRemoveTableMsg(SDRemoveTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle);
|
||||
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
|
||||
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
|
||||
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
||||
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
|
||||
void mgmtSendRemoveVgroupMsg(SVgObj *pVgroup, void *ahandle);
|
||||
|
||||
int32_t mgmtInitDnodeInt();
|
||||
void mgmtCleanUpDnodeInt();
|
||||
|
||||
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle);
|
||||
void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
|
||||
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -24,6 +24,9 @@ extern "C" {
|
|||
#include <stdbool.h>
|
||||
#include "mnode.h"
|
||||
|
||||
|
||||
bool mgmtCheckRedirect(void *handle);
|
||||
|
||||
int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp);
|
||||
int32_t mgmtRemoveMnode(uint32_t privateIp);
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ void mgmtCleanUpNormalTables();
|
|||
void * mgmtGetNormalTable(char *tableId);
|
||||
|
||||
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
|
||||
SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
|
||||
SDMCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
|
||||
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
|
||||
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
|
||||
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
|
||||
|
|
|
@ -19,30 +19,33 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include "mnode.h"
|
||||
|
||||
int32_t mgmtInitShell();
|
||||
int32_t mgmtInitShell();
|
||||
void mgmtCleanUpShell();
|
||||
void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
||||
|
||||
extern int32_t (*mgmtCheckRedirectMsg)(void *pConn);
|
||||
typedef int32_t (*SShowMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp);
|
||||
void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp);
|
||||
|
||||
/*
|
||||
* If table not exist, will create it
|
||||
*/
|
||||
void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
|
||||
|
||||
/*
|
||||
* If vgroup not exist, will create vgroup
|
||||
*/
|
||||
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
|
||||
|
||||
/*
|
||||
* If vgroup create returned, will then create table
|
||||
*/
|
||||
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
|
||||
//extern int32_t (*mgmtCheckRedirect)(void *pConn);
|
||||
//
|
||||
///*
|
||||
// * If table not exist, will create it
|
||||
// */
|
||||
//void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
|
||||
//
|
||||
///*
|
||||
// * If vgroup not exist, will create vgroup
|
||||
// */
|
||||
//void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
|
||||
//
|
||||
///*
|
||||
// * If vgroup create returned, will then create table
|
||||
// */
|
||||
//void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
|
|||
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
|
||||
void mgmtSetTableDirty(STableInfo *pTable, bool isDirty);
|
||||
|
||||
SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
|
||||
SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
|
||||
SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
|
||||
|
||||
|
||||
|
|
|
@ -19,20 +19,11 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include "mnode.h"
|
||||
|
||||
int32_t mgmtInitUsers();
|
||||
SUserObj *mgmtGetUser(char *name);
|
||||
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
|
||||
int32_t mgmtDropUser(SAcctObj *pAcct, char *name);
|
||||
int32_t mgmtUpdateUser(SUserObj *pUser);
|
||||
int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
void mgmtCleanUpUsers();
|
||||
SUserObj *mgmtGetUserFromConn(void *pConn);
|
||||
SUserObj *mgmtGetUser(char *name);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
|
|||
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable);
|
||||
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable);
|
||||
|
||||
SCreateVnodeMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode);
|
||||
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode);
|
||||
|
||||
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup);
|
||||
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip);
|
||||
|
|
|
@ -187,13 +187,13 @@ int32_t mgmtRetrieveAccts(SShowObj *pShow, char *data, int32_t rows, void *pConn
|
|||
}
|
||||
|
||||
SAcctObj *mgmtGetAcctFromConn(void *pConn) {
|
||||
SRpcConnInfo connInfo;
|
||||
rpcGetConnInfo(pConn, &connInfo);
|
||||
|
||||
SUserObj *pUser = mgmtGetUser(connInfo.user);
|
||||
if (pUser != NULL) {
|
||||
return pUser->pAcct;
|
||||
}
|
||||
// SRpcConnInfo connInfo;
|
||||
// rpcGetConnInfo(pConn, &connInfo);
|
||||
//
|
||||
// SUserObj *pUser = mgmtGetUser(connInfo.user);
|
||||
// if (pUser != NULL) {
|
||||
// return pUser->pAcct;
|
||||
// }
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@
|
|||
#include "mgmtAcct.h"
|
||||
#include "mgmtChildTable.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDnodeInt.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtProfile.h"
|
||||
#include "mgmtSuperTable.h"
|
||||
|
@ -273,9 +272,9 @@ void mgmtCleanUpChildTables() {
|
|||
|
||||
static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, void *pTagData, int32_t tagDataLen) {
|
||||
int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
|
||||
int32_t contLen = sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
|
||||
int32_t contLen = sizeof(SDMCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
|
||||
|
||||
SDCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
|
||||
SDMCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
|
||||
if (pCreateTable == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -308,13 +307,13 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou
|
|||
pSchema++;
|
||||
}
|
||||
|
||||
memcpy(pCreateTable + sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen);
|
||||
memcpy(pCreateTable + sizeof(SDMCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen);
|
||||
|
||||
return pCreateTable;
|
||||
}
|
||||
|
||||
int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
|
||||
SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
|
||||
SDMCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
|
||||
int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb);
|
||||
if (numOfTables >= tsMaxTables) {
|
||||
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables);
|
||||
|
@ -371,7 +370,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
|
|||
return TSDB_CODE_OTHERS;
|
||||
}
|
||||
|
||||
SDRemoveTableMsg *pRemove = rpcMallocCont(sizeof(SDRemoveTableMsg));
|
||||
SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg));
|
||||
if (pRemove == NULL) {
|
||||
mError("table:%s, failed to drop child table, no enough memory", pTable->tableId);
|
||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
|
@ -388,7 +387,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
|
|||
}
|
||||
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
||||
mgmtSendRemoveTableMsg(pRemove, &ipSet, NULL);
|
||||
mgmtSendDropTableMsg(pRemove, &ipSet, NULL);
|
||||
|
||||
if (sdbDeleteRow(tsChildTableSdb, pTable) < 0) {
|
||||
mError("table:%s, update ctables sdb error", pTable->tableId);
|
||||
|
|
|
@ -0,0 +1,313 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tsched.h"
|
||||
#include "tstatus.h"
|
||||
#include "tsystem.h"
|
||||
#include "tutil.h"
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtBalance.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDnode.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtProfile.h"
|
||||
#include "mgmtShell.h"
|
||||
#include "mgmtTable.h"
|
||||
#include "mgmtVgroup.h"
|
||||
|
||||
static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg);
|
||||
static void (*mgmtProcessDnodeRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
|
||||
static void *tsMgmtDClientRpc = NULL;
|
||||
|
||||
int32_t mgmtInitDClient() {
|
||||
SRpcInit rpcInit = {0};
|
||||
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "MND-DC";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = mgmtProcessRspFromDnode;
|
||||
rpcInit.sessions = tsMaxDnodes * 5;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.user = "mgmtDClient";
|
||||
rpcInit.ckey = "key";
|
||||
rpcInit.secret = "secret";
|
||||
|
||||
tsMgmtDClientRpc = rpcOpen(&rpcInit);
|
||||
if (tsMgmtDClientRpc == NULL) {
|
||||
mError("failed to init client connection to dnode");
|
||||
return -1;
|
||||
}
|
||||
|
||||
mPrint("client connection to dnode is opened");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mgmtCleanupDClient() {
|
||||
if (tsMgmtDClientRpc) {
|
||||
rpcClose(tsMgmtDClientRpc);
|
||||
tsMgmtDClientRpc = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtAddDClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
|
||||
mgmtProcessDnodeRspFp[msgType] = fp;
|
||||
}
|
||||
|
||||
static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
|
||||
if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) {
|
||||
(*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg);
|
||||
} else {
|
||||
dError("%s is not processed", taosMsg[rpcMsg->msgType]);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
}
|
||||
|
||||
//static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg);
|
||||
//static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg);
|
||||
//static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg);
|
||||
//static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
|
||||
//static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg);
|
||||
//static void mgmtProcessAlterVnodeRsp(SRpcMsg *rpcMsg);
|
||||
//static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg);
|
||||
//static void mgmtProcessAlterStreamRsp(SRpcMsg *rpcMsg);
|
||||
//static void mgmtProcessConfigDnodeRsp(SRpcMsg *rpcMsg);
|
||||
|
||||
//
|
||||
//static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
|
||||
// mTrace("create table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
|
||||
// if (rpcMsg->handle == NULL) return;
|
||||
//
|
||||
// SProcessInfo *info = rpcMsg->handle;
|
||||
// assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META);
|
||||
//
|
||||
// STableInfo *pTable = info->ahandle;
|
||||
// if (rpcMsg->code != TSDB_CODE_SUCCESS) {
|
||||
// mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId, rpcMsg->code);
|
||||
// mgmtSetTableDirty(pTable, true);
|
||||
// } else {
|
||||
// mTrace("table:%s, created in dnode", pTable->tableId);
|
||||
// mgmtSetTableDirty(pTable, false);
|
||||
// }
|
||||
//
|
||||
// if (rpcMsg->code != TSDB_CODE_SUCCESS) {
|
||||
// SRpcMsg rpcRsp = {.handle = info->thandle, .pCont = NULL, .contLen = 0, .code = rpcMsg->code, .msgType = 0};
|
||||
// rpcSendResponse(&rpcMsg);
|
||||
// } else {
|
||||
// if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) {
|
||||
// mTrace("table:%s, start to process get meta", pTable->tableId);
|
||||
// mgmtProcessGetTableMeta(pTable, rpcMsg->handle);
|
||||
// } else {
|
||||
// SRpcMsg rpcRsp = {.handle = info->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
// rpcSendResponse(&rpcMsg);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// free(info);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
|
||||
// mTrace("drop table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
||||
// mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
||||
// mTrace("create vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
|
||||
// if (rpcMsg->handle == NULL) return;
|
||||
//
|
||||
// SProcessInfo *info = rpcMsg->handle;
|
||||
// assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META);
|
||||
//
|
||||
// info->received++;
|
||||
// SVgObj *pVgroup = info->ahandle;
|
||||
//
|
||||
// bool isGetMeta = false;
|
||||
// if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) {
|
||||
// isGetMeta = true;
|
||||
// }
|
||||
//
|
||||
// mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes);
|
||||
// if (info->received == pVgroup->numOfVnodes) {
|
||||
// mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta);
|
||||
// free(info);
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
|
||||
// mTrace("drop vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessAlterVnodeRsp(SRpcMsg *rpcMsg) {
|
||||
// mTrace("alter vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) {
|
||||
// mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessAlterStreamRsp(SRpcMsg *rpcMsg) {
|
||||
// mTrace("alter stream rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessConfigDnodeRsp(SRpcMsg *rpcMsg) {
|
||||
// mTrace("config dnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
|
||||
//}
|
||||
//
|
||||
//void mgmtSendCreateTableMsg(SDMCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
|
||||
// mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle);
|
||||
// SRpcMsg rpcMsg = {
|
||||
// .handle = ahandle,
|
||||
// .pCont = pCreate,
|
||||
// .contLen = htonl(pCreate->contLen),
|
||||
// .code = 0,
|
||||
// .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
|
||||
// };
|
||||
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
|
||||
//}
|
||||
//
|
||||
//void mgmtSendDropTableMsg(SMDDropTableMsg *pDrop, SRpcIpSet *ipSet, void *ahandle) {
|
||||
// mTrace("table:%s, send drop table msg, ahandle:%p", pDrop->tableId, ahandle);
|
||||
// SRpcMsg rpcMsg = {
|
||||
// .handle = ahandle,
|
||||
// .pCont = pDrop,
|
||||
// .contLen = sizeof(SMDDropTableMsg),
|
||||
// .code = 0,
|
||||
// .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
|
||||
// };
|
||||
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
|
||||
//}
|
||||
//
|
||||
//void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
|
||||
// mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
|
||||
// SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup, vnode);
|
||||
// SRpcMsg rpcMsg = {
|
||||
// .handle = ahandle,
|
||||
// .pCont = pCreate,
|
||||
// .contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0,
|
||||
// .code = 0,
|
||||
// .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE
|
||||
// };
|
||||
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
|
||||
//}
|
||||
//
|
||||
//void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||
// mTrace("vgroup:%d, send create all vnodes msg, handle:%p", pVgroup->vgId, ahandle);
|
||||
// for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
|
||||
// mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
|
||||
// mTrace("table:%s, send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle);
|
||||
//}
|
||||
//
|
||||
//void mgmtSendDropVnodeMsg(int32_t vgId, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
|
||||
// mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle);
|
||||
// SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg));
|
||||
// SRpcMsg rpcMsg = {
|
||||
// .handle = ahandle,
|
||||
// .pCont = pDrop,
|
||||
// .contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0,
|
||||
// .code = 0,
|
||||
// .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE
|
||||
// };
|
||||
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
|
||||
//}
|
||||
//
|
||||
//void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||
// mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||
// for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
|
||||
// mgmtSendDropVnodeMsg(pVgroup->vgId, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
|
||||
// }
|
||||
//}
|
||||
////
|
||||
////int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
|
||||
//// char *option, *value;
|
||||
//// int32_t olen, valen;
|
||||
////
|
||||
//// paGetToken(msg, &option, &olen);
|
||||
//// if (strncasecmp(option, "unremove", 8) == 0) {
|
||||
//// mgmtSetDnodeUnRemove(pDnode);
|
||||
//// return TSDB_CODE_SUCCESS;
|
||||
//// } else if (strncasecmp(option, "score", 5) == 0) {
|
||||
//// paGetToken(option + olen + 1, &value, &valen);
|
||||
//// if (valen > 0) {
|
||||
//// int32_t score = atoi(value);
|
||||
//// mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score);
|
||||
//// pDnode->customScore = score;
|
||||
//// mgmtUpdateDnode(pDnode);
|
||||
//// //mgmtStartBalanceTimer(15);
|
||||
//// }
|
||||
//// return TSDB_CODE_INVALID_SQL;
|
||||
//// } else if (strncasecmp(option, "bandwidth", 9) == 0) {
|
||||
//// paGetToken(msg, &value, &valen);
|
||||
//// if (valen > 0) {
|
||||
//// int32_t bandwidthMb = atoi(value);
|
||||
//// if (bandwidthMb >= 0 && bandwidthMb < 10000000) {
|
||||
//// mTrace("dnode:%s, bandwidth(Mb) set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->bandwidthMb, bandwidthMb);
|
||||
//// pDnode->bandwidthMb = bandwidthMb;
|
||||
//// mgmtUpdateDnode(pDnode);
|
||||
//// return TSDB_CODE_SUCCESS;
|
||||
//// }
|
||||
//// }
|
||||
//// return TSDB_CODE_INVALID_SQL;
|
||||
//// }
|
||||
////
|
||||
//// return -1;
|
||||
////}
|
||||
////
|
||||
////int32_t mgmtSendCfgDnodeMsg(char *cont) {
|
||||
//// SDnodeObj *pDnode;
|
||||
//// SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont;
|
||||
//// uint32_t ip;
|
||||
////
|
||||
//// ip = inet_addr(pCfg->ip);
|
||||
//// pDnode = mgmtGetDnode(ip);
|
||||
//// if (pDnode == NULL) {
|
||||
//// mError("dnode ip:%s not configured", pCfg->ip);
|
||||
//// return TSDB_CODE_NOT_CONFIGURED;
|
||||
//// }
|
||||
////
|
||||
//// mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config);
|
||||
//// int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config);
|
||||
//// if (code != -1) {
|
||||
//// return code;
|
||||
//// }
|
||||
////
|
||||
////#ifdef CLUSTER
|
||||
//// pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_MD_CONFIG_DNODE);
|
||||
//// if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE;
|
||||
//// pMsg = pStart;
|
||||
////
|
||||
//// memcpy(pMsg, cont, sizeof(SCfgDnodeMsg));
|
||||
//// pMsg += sizeof(SCfgDnodeMsg);
|
||||
////
|
||||
//// msgLen = pMsg - pStart;
|
||||
//// mgmtSendMsgToDnode(pDnode, pStart, msgLen);
|
||||
////#else
|
||||
//// (void)tsCfgDynamicOptions(pCfg->config);
|
||||
////#endif
|
||||
//// return 0;
|
||||
////}
|
|
@ -0,0 +1,453 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "trpc.h"
|
||||
#include "tsched.h"
|
||||
#include "tstatus.h"
|
||||
#include "tsystem.h"
|
||||
#include "tutil.h"
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtBalance.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDServer.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtProfile.h"
|
||||
#include "mgmtShell.h"
|
||||
#include "mgmtTable.h"
|
||||
#include "mgmtVgroup.h"
|
||||
|
||||
|
||||
static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg);
|
||||
static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
|
||||
static void (*mgmtProcessDnodeMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
|
||||
static void *tsMgmtDServerRpc;
|
||||
|
||||
int32_t mgmtInitDServer() {
|
||||
SRpcInit rpcInit = {0};
|
||||
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;;
|
||||
rpcInit.localPort = tsMgmtDnodePort;
|
||||
rpcInit.label = "MND-DS";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = mgmtProcessMsgFromDnode;
|
||||
rpcInit.sessions = tsMaxDnodes * 5;
|
||||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.afp = mgmtDServerRetrieveAuth;
|
||||
|
||||
tsMgmtDServerRpc = rpcOpen(&rpcInit);
|
||||
if (tsMgmtDServerRpc == NULL) {
|
||||
mError("failed to init server connection to dnode");
|
||||
return -1;
|
||||
}
|
||||
|
||||
mPrint("server connection to dnode is opened");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mgmtCleanupDServer() {
|
||||
if (tsMgmtDServerRpc) {
|
||||
rpcClose(tsMgmtDServerRpc);
|
||||
tsMgmtDServerRpc = NULL;
|
||||
mPrint("server connection to dnode is closed");
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
|
||||
mgmtProcessDnodeMsgFp[msgType] = fp;
|
||||
}
|
||||
|
||||
static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
|
||||
if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) {
|
||||
(*mgmtProcessDnodeMsgFp[rpcMsg->msgType])(rpcMsg);
|
||||
} else {
|
||||
mError("%s is not processed", taosMsg[rpcMsg->msgType]);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
}
|
||||
|
||||
static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
//
|
||||
//
|
||||
//static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) {
|
||||
// STableCfgMsg *pCfg = (STableCfgMsg *) pCont;
|
||||
// pCfg->dnode = htonl(pCfg->dnode);
|
||||
// pCfg->vnode = htonl(pCfg->vnode);
|
||||
// pCfg->sid = htonl(pCfg->sid);
|
||||
// mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
||||
//
|
||||
// if (!sdbMaster) {
|
||||
// mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
||||
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid);
|
||||
// if (pTable == NULL) {
|
||||
// mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
||||
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
|
||||
//
|
||||
// //TODO
|
||||
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
|
||||
// mgmtSendCreateTableMsg(NULL, &ipSet, NULL);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) {
|
||||
// if (!sdbMaster) {
|
||||
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// SVpeerCfgMsg *pCfg = (SVpeerCfgMsg *) pCont;
|
||||
// pCfg->dnode = htonl(pCfg->dnode);
|
||||
// pCfg->vnode = htonl(pCfg->vnode);
|
||||
//
|
||||
// SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode);
|
||||
// if (pVgroup == NULL) {
|
||||
// mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode);
|
||||
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_NOT_ACTIVE_VNODE, NULL, 0);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
|
||||
//
|
||||
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
|
||||
// mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||
// mTrace("create table rsp received, thandle:%p code:%d", thandle, code);
|
||||
// if (thandle == NULL) return;
|
||||
//
|
||||
// SProcessInfo *info = thandle;
|
||||
// assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META);
|
||||
// STableInfo *pTable = info->ahandle;
|
||||
//
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId);
|
||||
// mgmtSetTableDirty(pTable, true);
|
||||
// } else {
|
||||
// mTrace("table:%s, created in dnode", pTable->tableId);
|
||||
// mgmtSetTableDirty(pTable, false);
|
||||
// }
|
||||
//
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// SRpcMsg rpcMsg = {0};
|
||||
// rpcMsg.code = code;
|
||||
// rpcMsg.handle = info->thandle;
|
||||
// rpcSendResponse(&rpcMsg);
|
||||
// } else {
|
||||
// if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) {
|
||||
// mTrace("table:%s, start to process get meta", pTable->tableId);
|
||||
// mgmtProcessGetTableMeta(pTable, thandle);
|
||||
// } else {
|
||||
// SRpcMsg rpcMsg = {0};
|
||||
// rpcMsg.code = code;
|
||||
// rpcMsg.handle = info->thandle;
|
||||
// rpcSendResponse(&rpcMsg);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// free(info);
|
||||
//}
|
||||
//
|
||||
//void mgmtSendCreateTableMsg(SDMCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
|
||||
// mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle);
|
||||
// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_CREATE_TABLE, pCreate, htonl(pCreate->contLen), ahandle);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||
// mTrace("remove table rsp received, thandle:%p code:%d", thandle, code);
|
||||
//}
|
||||
//
|
||||
//void mgmtSendDropTableMsg(SMDDropTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle) {
|
||||
// mTrace("table:%s, sid:%d send remove table msg, ahandle:%p", pRemove->tableId, htonl(pRemove->sid), ahandle);
|
||||
// if (pRemove != NULL) {
|
||||
// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_DROP_TABLE, pRemove, sizeof(SMDDropTableMsg), ahandle);
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessDropVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||
// mTrace("free vnode rsp received, thandle:%p code:%d", thandle, code);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessDropStableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||
// mTrace("drop stable rsp received, thandle:%p code:%d", thandle, code);
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||
// mTrace("create vnode rsp received, thandle:%p code:%d", thandle, code);
|
||||
// if (thandle == NULL) return;
|
||||
//
|
||||
// SProcessInfo *info = thandle;
|
||||
// assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META);
|
||||
// info->received++;
|
||||
// SVgObj *pVgroup = info->ahandle;
|
||||
//
|
||||
// bool isGetMeta = false;
|
||||
// if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) {
|
||||
// isGetMeta = true;
|
||||
// }
|
||||
//
|
||||
// mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes);
|
||||
// if (info->received == pVgroup->numOfVnodes) {
|
||||
// mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta);
|
||||
// free(info);
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||
// mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||
// for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
|
||||
// mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
|
||||
// mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
|
||||
// SMDCreateVnodeMsg *pVpeer = mgmtBuildCreateVnodeMsg(pVgroup, vnode);
|
||||
// if (pVpeer != NULL) {
|
||||
// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_CREATE_VNODE, pVpeer, sizeof(SMDCreateVnodeMsg), ahandle);
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//static void mgmtProcessDnodeGrantMsg(void *pCont, void *thandle) {
|
||||
// if (mgmtUpdateGrantInfoFp) {
|
||||
// mgmtUpdateGrantInfoFp(pCont);
|
||||
// mTrace("grant info is updated");
|
||||
// }
|
||||
//
|
||||
// SRpcMsg rpcMsg = {0};
|
||||
// rpcMsg.code = TSDB_CODE_SUCCESS;
|
||||
// rpcMsg.handle = thandle;
|
||||
// rpcSendResponse(&rpcMsg);
|
||||
//}
|
||||
//
|
||||
//void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
|
||||
// if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
|
||||
// mError("invalid msg type:%d", msgType);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn);
|
||||
//
|
||||
// if (msgType == TSDB_MSG_TYPE_TABLE_CFG) {
|
||||
// mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn);
|
||||
// } else if (msgType == TSDB_MSG_TYPE_DM_CONFIG_VNODE) {
|
||||
// mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, pConn);
|
||||
// } else if (msgType == TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP) {
|
||||
// mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code);
|
||||
// } else if (msgType == TSDB_MSG_TYPE_MD_DROP_TABLE_RSP) {
|
||||
// mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code);
|
||||
// } else if (msgType == TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP) {
|
||||
// mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code);
|
||||
// } else if (msgType == TSDB_MSG_TYPE_MD_DROP_VNODE_RSP) {
|
||||
// mgmtProcessDropVnodeRsp(msgType, pCont, contLen, pConn, code);
|
||||
// } else if (msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
|
||||
// mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code);
|
||||
// } else if (msgType == TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP) {
|
||||
// } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
|
||||
// } else if (msgType == TSDB_MSG_TYPE_STATUS) {
|
||||
// mgmtProcessDnodeStatus(msgType, pCont, contLen, pConn, code);
|
||||
// } else if (msgType == TSDB_MSG_TYPE_GRANT) {
|
||||
// mgmtProcessDnodeGrantMsg(pCont, pConn);
|
||||
// } else {
|
||||
// mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]);
|
||||
// }
|
||||
//
|
||||
// //rpcFreeCont(pCont);
|
||||
//}
|
||||
//
|
||||
//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
|
||||
// mTrace("table:%s, sid:%d send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle);
|
||||
//}
|
||||
//
|
||||
//void mgmtSendDropVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
|
||||
// mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle);
|
||||
//
|
||||
// SMDDropVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SMDDropVnodeMsg));
|
||||
// if (pFreeVnode != NULL) {
|
||||
// pFreeVnode->vnode = htonl(vnode);
|
||||
// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_DROP_VNODE, pFreeVnode, sizeof(SMDDropVnodeMsg), ahandle);
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||
// mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||
//
|
||||
// for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
|
||||
// mgmtSendDropVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
|
||||
// char *option, *value;
|
||||
// int32_t olen, valen;
|
||||
//
|
||||
// paGetToken(msg, &option, &olen);
|
||||
// if (strncasecmp(option, "unremove", 8) == 0) {
|
||||
// mgmtSetDnodeUnRemove(pDnode);
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// } else if (strncasecmp(option, "score", 5) == 0) {
|
||||
// paGetToken(option + olen + 1, &value, &valen);
|
||||
// if (valen > 0) {
|
||||
// int32_t score = atoi(value);
|
||||
// mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score);
|
||||
// pDnode->customScore = score;
|
||||
// mgmtUpdateDnode(pDnode);
|
||||
// //mgmtStartBalanceTimer(15);
|
||||
// }
|
||||
// return TSDB_CODE_INVALID_SQL;
|
||||
// } else if (strncasecmp(option, "bandwidth", 9) == 0) {
|
||||
// paGetToken(msg, &value, &valen);
|
||||
// if (valen > 0) {
|
||||
// int32_t bandwidthMb = atoi(value);
|
||||
// if (bandwidthMb >= 0 && bandwidthMb < 10000000) {
|
||||
// mTrace("dnode:%s, bandwidth(Mb) set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->bandwidthMb, bandwidthMb);
|
||||
// pDnode->bandwidthMb = bandwidthMb;
|
||||
// mgmtUpdateDnode(pDnode);
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
// }
|
||||
// return TSDB_CODE_INVALID_SQL;
|
||||
// }
|
||||
//
|
||||
// return -1;
|
||||
//}
|
||||
//
|
||||
//int32_t mgmtSendCfgDnodeMsg(char *cont) {
|
||||
////#ifdef CLUSTER
|
||||
//// char * pMsg, *pStart;
|
||||
//// int32_t msgLen = 0;
|
||||
////#endif
|
||||
////
|
||||
//// SDnodeObj *pDnode;
|
||||
//// SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont;
|
||||
//// uint32_t ip;
|
||||
////
|
||||
//// ip = inet_addr(pCfg->ip);
|
||||
//// pDnode = mgmtGetDnode(ip);
|
||||
//// if (pDnode == NULL) {
|
||||
//// mError("dnode ip:%s not configured", pCfg->ip);
|
||||
//// return TSDB_CODE_NOT_CONFIGURED;
|
||||
//// }
|
||||
////
|
||||
//// mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config);
|
||||
//// int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config);
|
||||
//// if (code != -1) {
|
||||
//// return code;
|
||||
//// }
|
||||
////
|
||||
////#ifdef CLUSTER
|
||||
//// pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_MD_CONFIG_DNODE);
|
||||
//// if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE;
|
||||
//// pMsg = pStart;
|
||||
////
|
||||
//// memcpy(pMsg, cont, sizeof(SCfgDnodeMsg));
|
||||
//// pMsg += sizeof(SCfgDnodeMsg);
|
||||
////
|
||||
//// msgLen = pMsg - pStart;
|
||||
//// mgmtSendMsgToDnode(pDnode, pStart, msgLen);
|
||||
////#else
|
||||
//// (void)tsCfgDynamicOptions(pCfg->config);
|
||||
////#endif
|
||||
// return 0;
|
||||
//}
|
||||
//
|
||||
//int32_t mgmtInitDnodeInt() {
|
||||
// if (mgmtInitDnodeIntFp) {
|
||||
// return mgmtInitDnodeIntFp();
|
||||
// } else {
|
||||
// return 0;
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//void mgmtCleanUpDnodeInt() {
|
||||
// if (mgmtCleanUpDnodeIntFp) {
|
||||
// mgmtCleanUpDnodeIntFp();
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
|
||||
// SStatusMsg *pStatus = (SStatusMsg *)pCont;
|
||||
//
|
||||
// SDnodeObj *pObj = mgmtGetDnode(htonl(pStatus->privateIp));
|
||||
// if (pObj == NULL) {
|
||||
// mError("dnode:%s not exist", taosIpStr(pObj->privateIp));
|
||||
// mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_DNODE_NOT_EXIST, NULL, 0);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// pObj->lastReboot = htonl(pStatus->lastReboot);
|
||||
// pObj->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
|
||||
// pObj->openVnodes = htons(pStatus->openVnodes);
|
||||
// pObj->numOfCores = htons(pStatus->numOfCores);
|
||||
// pObj->diskAvailable = pStatus->diskAvailable;
|
||||
// pObj->alternativeRole = pStatus->alternativeRole;
|
||||
////
|
||||
//// if (mgmtProcessDnodeStatusFp) {
|
||||
//// mgmtProcessDnodeStatusFp(pStatus, pObj, pConn);
|
||||
//// return;
|
||||
//// }
|
||||
//
|
||||
// pObj->status = TSDB_DN_STATUS_READY;
|
||||
//
|
||||
//// // wait vnode dropped
|
||||
//// for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) {
|
||||
//// SVnodeLoad *pVload = &(pObj->vload[vnode]);
|
||||
//// if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) {
|
||||
//// bool existInDnode = false;
|
||||
//// for (int32_t j = 0; j < pObj->openVnodes; ++j) {
|
||||
//// if (htonl(pStatus->load[j].vnode) == vnode) {
|
||||
//// existInDnode = true;
|
||||
//// break;
|
||||
//// }
|
||||
//// }
|
||||
////
|
||||
//// if (!existInDnode) {
|
||||
//// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
|
||||
//// pVload->status = TSDB_VN_STATUS_OFFLINE;
|
||||
//// mgmtUpdateDnode(pObj);
|
||||
//// mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode);
|
||||
//// taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr);
|
||||
//// }
|
||||
//// } else if (pVload->vgId == 0) {
|
||||
//// /*
|
||||
//// * In some cases, vnode information may be reported abnormally, recover it
|
||||
//// */
|
||||
//// if (pVload->dropStatus != TSDB_VN_DROP_STATUS_READY || pVload->status != TSDB_VN_STATUS_OFFLINE) {
|
||||
//// mPrint("dnode:%s, vid:%d, vgroup:%d status:%s dropStatus:%s, set it to avail status",
|
||||
//// taosIpStr(pObj->privateIp), vnode, pVload->vgId, taosGetVnodeStatusStr(pVload->status),
|
||||
//// taosGetVnodeDropStatusStr(pVload->dropStatus));
|
||||
//// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
|
||||
//// pVload->status = TSDB_VN_STATUS_OFFLINE;
|
||||
//// mgmtUpdateDnode(pObj);
|
||||
//// }
|
||||
//// }
|
||||
//// }
|
||||
//}
|
|
@ -23,7 +23,6 @@
|
|||
#include "mgmtBalance.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDnode.h"
|
||||
#include "mgmtDnodeInt.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtTable.h"
|
||||
#include "mgmtUser.h"
|
||||
|
@ -295,7 +294,7 @@ int32_t mgmtSetDbDropping(SDbObj *pDb) {
|
|||
}
|
||||
}
|
||||
}
|
||||
// mgmtSendRemoveVgroupMsg(pVgroup);
|
||||
// mgmtSendDropVgroupMsg(pVgroup);
|
||||
pVgroup = pVgroup->next;
|
||||
}
|
||||
|
||||
|
@ -355,7 +354,7 @@ int32_t mgmtDropDb(SDbObj *pDb) {
|
|||
if (!finished) {
|
||||
SVgObj *pVgroup = pDb->pHead;
|
||||
while (pVgroup != NULL) {
|
||||
mgmtSendRemoveVgroupMsg(pVgroup, NULL);
|
||||
mgmtSendDropVgroupMsg(pVgroup, NULL);
|
||||
pVgroup = pVgroup->next;
|
||||
}
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
|
|
@ -1,455 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tsched.h"
|
||||
#include "tstatus.h"
|
||||
#include "tsystem.h"
|
||||
#include "tutil.h"
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtBalance.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDnode.h"
|
||||
#include "mgmtDnodeInt.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtProfile.h"
|
||||
#include "mgmtShell.h"
|
||||
#include "mgmtTable.h"
|
||||
#include "mgmtVgroup.h"
|
||||
|
||||
int32_t (*mgmtInitDnodeIntFp)() = NULL;
|
||||
void (*mgmtCleanUpDnodeIntFp)() = NULL;
|
||||
|
||||
void (*mgmtSendMsgToDnodeFp)(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) = NULL;
|
||||
void (*mgmtSendRspToDnodeFp)(void *handle, int32_t code, void *pCont, int32_t contLen) = NULL;
|
||||
void *mgmtStatusTimer = NULL;
|
||||
|
||||
static void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
|
||||
|
||||
static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) {
|
||||
int32_t contLen = *(int32_t *) (sched->msg - 4);
|
||||
int32_t code = *(int32_t *) (sched->msg - 8);
|
||||
int8_t msgType = *(int8_t *) (sched->msg - 9);
|
||||
void *ahandle = sched->ahandle;
|
||||
int8_t *pCont = sched->msg;
|
||||
|
||||
// dnodeProcessMsgFromMgmt(msgType, pCont, contLen, ahandle, code);
|
||||
}
|
||||
|
||||
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) {
|
||||
mTrace("msg:%d:%s is sent to dnode, ahandle:%p", msgType, taosMsg[msgType], ahandle);
|
||||
if (mgmtSendMsgToDnodeFp) {
|
||||
mgmtSendMsgToDnodeFp(ipSet, msgType, pCont, contLen, ahandle);
|
||||
} else {
|
||||
if (pCont == NULL) {
|
||||
pCont = rpcMallocCont(1);
|
||||
contLen = 0;
|
||||
}
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = mgmtSendMsgToDnodeQueueFp;
|
||||
schedMsg.msg = pCont;
|
||||
schedMsg.ahandle = ahandle;
|
||||
*(int32_t *) (pCont - 4) = contLen;
|
||||
*(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS;
|
||||
*(int8_t *) (pCont - 9) = msgType;
|
||||
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) {
|
||||
mTrace("rsp:%d:%s is sent to dnode", msgType, taosMsg[msgType]);
|
||||
if (mgmtSendRspToDnodeFp) {
|
||||
mgmtSendRspToDnodeFp(pConn, code, pCont, contLen);
|
||||
} else {
|
||||
if (pCont == NULL) {
|
||||
pCont = rpcMallocCont(1);
|
||||
contLen = 0;
|
||||
}
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = mgmtSendMsgToDnodeQueueFp;
|
||||
schedMsg.msg = pCont;
|
||||
*(int32_t *) (pCont - 4) = contLen;
|
||||
*(int32_t *) (pCont - 8) = code;
|
||||
*(int8_t *) (pCont - 9) = msgType;
|
||||
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle) {
|
||||
STableCfgMsg *pCfg = (STableCfgMsg *) pCont;
|
||||
pCfg->dnode = htonl(pCfg->dnode);
|
||||
pCfg->vnode = htonl(pCfg->vnode);
|
||||
pCfg->sid = htonl(pCfg->sid);
|
||||
mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
||||
|
||||
if (!sdbMaster) {
|
||||
mError("dnode:%s, vnode:%d, sid:%d, not master, redirect it", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
||||
mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
STableInfo *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid);
|
||||
if (pTable == NULL) {
|
||||
mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
||||
mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_INVALID_TABLE, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
|
||||
|
||||
//TODO
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
|
||||
mgmtSendCreateTableMsg(NULL, &ipSet, NULL);
|
||||
}
|
||||
|
||||
static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) {
|
||||
if (!sdbMaster) {
|
||||
mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_REDIRECT, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
SVpeerCfgMsg *pCfg = (SVpeerCfgMsg *) pCont;
|
||||
pCfg->dnode = htonl(pCfg->dnode);
|
||||
pCfg->vnode = htonl(pCfg->vnode);
|
||||
|
||||
SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode);
|
||||
if (pVgroup == NULL) {
|
||||
mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode);
|
||||
mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_NOT_ACTIVE_VNODE, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
|
||||
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
|
||||
mgmtSendCreateVnodeMsg(pVgroup, pCfg->vnode, &ipSet, NULL);
|
||||
}
|
||||
|
||||
static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||
mTrace("create table rsp received, thandle:%p code:%d", thandle, code);
|
||||
if (thandle == NULL) return;
|
||||
|
||||
SProcessInfo *info = thandle;
|
||||
assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META);
|
||||
STableInfo *pTable = info->ahandle;
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId);
|
||||
mgmtSetTableDirty(pTable, true);
|
||||
} else {
|
||||
mTrace("table:%s, created in dnode", pTable->tableId);
|
||||
mgmtSetTableDirty(pTable, false);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.code = code;
|
||||
rpcMsg.handle = info->thandle;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
} else {
|
||||
if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) {
|
||||
mTrace("table:%s, start to process get meta", pTable->tableId);
|
||||
mgmtProcessGetTableMeta(pTable, thandle);
|
||||
} else {
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.code = code;
|
||||
rpcMsg.handle = info->thandle;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
}
|
||||
}
|
||||
|
||||
free(info);
|
||||
}
|
||||
|
||||
void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
|
||||
mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle);
|
||||
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), ahandle);
|
||||
}
|
||||
|
||||
static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||
mTrace("remove table rsp received, thandle:%p code:%d", thandle, code);
|
||||
}
|
||||
|
||||
void mgmtSendRemoveTableMsg(SDRemoveTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle) {
|
||||
mTrace("table:%s, sid:%d send remove table msg, ahandle:%p", pRemove->tableId, htonl(pRemove->sid), ahandle);
|
||||
if (pRemove != NULL) {
|
||||
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_REMOVE_TABLE, pRemove, sizeof(SDRemoveTableMsg), ahandle);
|
||||
}
|
||||
}
|
||||
|
||||
static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||
mTrace("free vnode rsp received, thandle:%p code:%d", thandle, code);
|
||||
}
|
||||
|
||||
static void mgmtProcessDropStableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||
mTrace("drop stable rsp received, thandle:%p code:%d", thandle, code);
|
||||
}
|
||||
|
||||
static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
|
||||
mTrace("create vnode rsp received, thandle:%p code:%d", thandle, code);
|
||||
if (thandle == NULL) return;
|
||||
|
||||
SProcessInfo *info = thandle;
|
||||
assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META);
|
||||
info->received++;
|
||||
SVgObj *pVgroup = info->ahandle;
|
||||
|
||||
bool isGetMeta = false;
|
||||
if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) {
|
||||
isGetMeta = true;
|
||||
}
|
||||
|
||||
mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes);
|
||||
if (info->received == pVgroup->numOfVnodes) {
|
||||
mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta);
|
||||
free(info);
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||
mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
|
||||
mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
|
||||
mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
|
||||
SCreateVnodeMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode);
|
||||
if (pVpeer != NULL) {
|
||||
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_CREATE_VNODE, pVpeer, sizeof(SCreateVnodeMsg), ahandle);
|
||||
}
|
||||
}
|
||||
|
||||
static void mgmtProcessDnodeGrantMsg(void *pCont, void *thandle) {
|
||||
if (mgmtUpdateGrantInfoFp) {
|
||||
mgmtUpdateGrantInfoFp(pCont);
|
||||
mTrace("grant info is updated");
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.code = TSDB_CODE_SUCCESS;
|
||||
rpcMsg.handle = thandle;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
}
|
||||
|
||||
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
|
||||
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
|
||||
mError("invalid msg type:%d", msgType);
|
||||
return;
|
||||
}
|
||||
|
||||
mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn);
|
||||
|
||||
if (msgType == TSDB_MSG_TYPE_TABLE_CFG) {
|
||||
mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn);
|
||||
} else if (msgType == TSDB_MSG_TYPE_CONFIG_VNODE) {
|
||||
mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, pConn);
|
||||
} else if (msgType == TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP) {
|
||||
mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code);
|
||||
} else if (msgType == TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP) {
|
||||
mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code);
|
||||
} else if (msgType == TSDB_MSG_TYPE_CREATE_VNODE_RSP) {
|
||||
mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code);
|
||||
} else if (msgType == TSDB_MSG_TYPE_DROP_VNODE_RSP) {
|
||||
mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, pConn, code);
|
||||
} else if (msgType == TSDB_MSG_TYPE_DROP_STABLE) {
|
||||
mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code);
|
||||
} else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) {
|
||||
} else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
|
||||
} else if (msgType == TSDB_MSG_TYPE_STATUS) {
|
||||
mgmtProcessDnodeStatus(msgType, pCont, contLen, pConn, code);
|
||||
} else if (msgType == TSDB_MSG_TYPE_GRANT) {
|
||||
mgmtProcessDnodeGrantMsg(pCont, pConn);
|
||||
} else {
|
||||
mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]);
|
||||
}
|
||||
|
||||
//rpcFreeCont(pCont);
|
||||
}
|
||||
|
||||
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
|
||||
mTrace("table:%s, sid:%d send alter stream msg, ahandle:%p", pTable->tableId, pTable->sid, ahandle);
|
||||
}
|
||||
|
||||
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
|
||||
mTrace("vnode:%d send free vnode msg, ahandle:%p", vnode, ahandle);
|
||||
|
||||
SDropVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SDropVnodeMsg));
|
||||
if (pFreeVnode != NULL) {
|
||||
pFreeVnode->vnode = htonl(vnode);
|
||||
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DROP_VNODE, pFreeVnode, sizeof(SDropVnodeMsg), ahandle);
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtSendRemoveVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||
mTrace("vgroup:%d send free vgroup msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
|
||||
mgmtSendOneFreeVnodeMsg(pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
|
||||
char *option, *value;
|
||||
int32_t olen, valen;
|
||||
|
||||
paGetToken(msg, &option, &olen);
|
||||
if (strncasecmp(option, "unremove", 8) == 0) {
|
||||
mgmtSetDnodeUnRemove(pDnode);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (strncasecmp(option, "score", 5) == 0) {
|
||||
paGetToken(option + olen + 1, &value, &valen);
|
||||
if (valen > 0) {
|
||||
int32_t score = atoi(value);
|
||||
mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score);
|
||||
pDnode->customScore = score;
|
||||
mgmtUpdateDnode(pDnode);
|
||||
//mgmtStartBalanceTimer(15);
|
||||
}
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
} else if (strncasecmp(option, "bandwidth", 9) == 0) {
|
||||
paGetToken(msg, &value, &valen);
|
||||
if (valen > 0) {
|
||||
int32_t bandwidthMb = atoi(value);
|
||||
if (bandwidthMb >= 0 && bandwidthMb < 10000000) {
|
||||
mTrace("dnode:%s, bandwidth(Mb) set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->bandwidthMb, bandwidthMb);
|
||||
pDnode->bandwidthMb = bandwidthMb;
|
||||
mgmtUpdateDnode(pDnode);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t mgmtSendCfgDnodeMsg(char *cont) {
|
||||
//#ifdef CLUSTER
|
||||
// char * pMsg, *pStart;
|
||||
// int32_t msgLen = 0;
|
||||
//#endif
|
||||
//
|
||||
// SDnodeObj *pDnode;
|
||||
// SCfgDnodeMsg * pCfg = (SCfgDnodeMsg *)cont;
|
||||
// uint32_t ip;
|
||||
//
|
||||
// ip = inet_addr(pCfg->ip);
|
||||
// pDnode = mgmtGetDnode(ip);
|
||||
// if (pDnode == NULL) {
|
||||
// mError("dnode ip:%s not configured", pCfg->ip);
|
||||
// return TSDB_CODE_NOT_CONFIGURED;
|
||||
// }
|
||||
//
|
||||
// mTrace("dnode:%s, dynamic option received, content:%s", taosIpStr(pDnode->privateIp), pCfg->config);
|
||||
// int32_t code = mgmtCfgDynamicOptions(pDnode, pCfg->config);
|
||||
// if (code != -1) {
|
||||
// return code;
|
||||
// }
|
||||
//
|
||||
//#ifdef CLUSTER
|
||||
// pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_DNODE_CFG);
|
||||
// if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE;
|
||||
// pMsg = pStart;
|
||||
//
|
||||
// memcpy(pMsg, cont, sizeof(SCfgDnodeMsg));
|
||||
// pMsg += sizeof(SCfgDnodeMsg);
|
||||
//
|
||||
// msgLen = pMsg - pStart;
|
||||
// mgmtSendMsgToDnode(pDnode, pStart, msgLen);
|
||||
//#else
|
||||
// (void)tsCfgDynamicOptions(pCfg->config);
|
||||
//#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mgmtInitDnodeInt() {
|
||||
if (mgmtInitDnodeIntFp) {
|
||||
return mgmtInitDnodeIntFp();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtCleanUpDnodeInt() {
|
||||
if (mgmtCleanUpDnodeIntFp) {
|
||||
mgmtCleanUpDnodeIntFp();
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtProcessDnodeStatus(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
|
||||
SStatusMsg *pStatus = (SStatusMsg *)pCont;
|
||||
|
||||
SDnodeObj *pObj = mgmtGetDnode(htonl(pStatus->privateIp));
|
||||
if (pObj == NULL) {
|
||||
mError("dnode:%s not exist", taosIpStr(pObj->privateIp));
|
||||
mgmtSendRspToDnode(pConn, msgType + 1, TSDB_CODE_DNODE_NOT_EXIST, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
pObj->lastReboot = htonl(pStatus->lastReboot);
|
||||
pObj->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
|
||||
pObj->openVnodes = htons(pStatus->openVnodes);
|
||||
pObj->numOfCores = htons(pStatus->numOfCores);
|
||||
pObj->diskAvailable = pStatus->diskAvailable;
|
||||
pObj->alternativeRole = pStatus->alternativeRole;
|
||||
//
|
||||
// if (mgmtProcessDnodeStatusFp) {
|
||||
// mgmtProcessDnodeStatusFp(pStatus, pObj, pConn);
|
||||
// return;
|
||||
// }
|
||||
|
||||
pObj->status = TSDB_DN_STATUS_READY;
|
||||
|
||||
// // wait vnode dropped
|
||||
// for (int32_t vnode = 0; vnode < pObj->numOfVnodes; ++vnode) {
|
||||
// SVnodeLoad *pVload = &(pObj->vload[vnode]);
|
||||
// if (pVload->dropStatus == TSDB_VN_DROP_STATUS_DROPPING) {
|
||||
// bool existInDnode = false;
|
||||
// for (int32_t j = 0; j < pObj->openVnodes; ++j) {
|
||||
// if (htonl(pStatus->load[j].vnode) == vnode) {
|
||||
// existInDnode = true;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if (!existInDnode) {
|
||||
// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
|
||||
// pVload->status = TSDB_VN_STATUS_OFFLINE;
|
||||
// mgmtUpdateDnode(pObj);
|
||||
// mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode);
|
||||
// taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, tsMgmtTmr);
|
||||
// }
|
||||
// } else if (pVload->vgId == 0) {
|
||||
// /*
|
||||
// * In some cases, vnode information may be reported abnormally, recover it
|
||||
// */
|
||||
// if (pVload->dropStatus != TSDB_VN_DROP_STATUS_READY || pVload->status != TSDB_VN_STATUS_OFFLINE) {
|
||||
// mPrint("dnode:%s, vid:%d, vgroup:%d status:%s dropStatus:%s, set it to avail status",
|
||||
// taosIpStr(pObj->privateIp), vnode, pVload->vgId, taosGetVnodeStatusStr(pVload->status),
|
||||
// taosGetVnodeDropStatusStr(pVload->dropStatus));
|
||||
// pVload->dropStatus = TSDB_VN_DROP_STATUS_READY;
|
||||
// pVload->status = TSDB_VN_STATUS_OFFLINE;
|
||||
// mgmtUpdateDnode(pObj);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
}
|
|
@ -14,6 +14,8 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "trpc.h"
|
||||
#include "tschemautil.h"
|
||||
#include "mgmtMnode.h"
|
||||
#include "mgmtUser.h"
|
||||
|
@ -23,6 +25,10 @@ int32_t (*mgmtRemoveMnodeFp)(uint32_t privateIp) = NULL;
|
|||
int32_t (*mgmtGetMnodesNumFp)() = NULL;
|
||||
void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL;
|
||||
|
||||
bool mgmtCheckRedirect(void *handle) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp) {
|
||||
if (mgmtAddMnodeFp) {
|
||||
return (*mgmtAddMnodeFp)(privateIp, publicIp);
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
#include "mnode.h"
|
||||
#include "mgmtAcct.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDnodeInt.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtNormalTable.h"
|
||||
#include "mgmtSuperTable.h"
|
||||
|
@ -289,9 +288,9 @@ void mgmtCleanUpNormalTables() {
|
|||
|
||||
static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
|
||||
int32_t totalCols = pTable->numOfColumns;
|
||||
int32_t contLen = sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
|
||||
int32_t contLen = sizeof(SDMCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
|
||||
|
||||
SDCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
|
||||
SDMCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
|
||||
if (pCreateTable == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -323,13 +322,13 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr
|
|||
pSchema++;
|
||||
}
|
||||
|
||||
memcpy(pCreateTable + sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
|
||||
memcpy(pCreateTable + sizeof(SDMCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
|
||||
|
||||
return pCreateTable;
|
||||
}
|
||||
|
||||
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
|
||||
SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
|
||||
SDMCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
|
||||
int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb);
|
||||
if (numOfTables >= TSDB_MAX_NORMAL_TABLES) {
|
||||
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES);
|
||||
|
@ -406,7 +405,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
|
|||
return TSDB_CODE_OTHERS;
|
||||
}
|
||||
|
||||
SDRemoveTableMsg *pRemove = rpcMallocCont(sizeof(SDRemoveTableMsg));
|
||||
SMDDropTableMsg *pRemove = rpcMallocCont(sizeof(SMDDropTableMsg));
|
||||
if (pRemove == NULL) {
|
||||
mError("table:%s, failed to drop normal table, no enough memory", pTable->tableId);
|
||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
|
@ -423,7 +422,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
|
|||
}
|
||||
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
||||
mgmtSendRemoveTableMsg(pRemove, &ipSet, NULL);
|
||||
mgmtSendDropTableMsg(pRemove, &ipSet, NULL);
|
||||
|
||||
if (sdbDeleteRow(tsNormalTableSdb, pTable) < 0) {
|
||||
mError("table:%s, update ntables sdb error", pTable->tableId);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -28,7 +28,6 @@
|
|||
#include "mgmtChildTable.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDnode.h"
|
||||
#include "mgmtDnodeInt.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtSuperTable.h"
|
||||
#include "mgmtTable.h"
|
||||
|
|
|
@ -23,7 +23,6 @@
|
|||
#include "mgmtBalance.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDnode.h"
|
||||
#include "mgmtDnodeInt.h"
|
||||
#include "mgmtVgroup.h"
|
||||
#include "mgmtUser.h"
|
||||
#include "mgmtSystem.h"
|
||||
|
|
|
@ -30,7 +30,6 @@
|
|||
#include "mgmtChildTable.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDnode.h"
|
||||
#include "mgmtDnodeInt.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtNormalTable.h"
|
||||
#include "mgmtProfile.h"
|
||||
|
@ -412,8 +411,8 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) {
|
||||
SDRemoveTableMsg *pRemove = NULL;
|
||||
SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) {
|
||||
SMDDropTableMsg *pRemove = NULL;
|
||||
|
||||
|
||||
return pRemove;
|
||||
|
|
|
@ -15,42 +15,45 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "trpc.h"
|
||||
#include "tschemautil.h"
|
||||
#include "ttime.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtAcct.h"
|
||||
|
||||
|
||||
#include "mgmtMnode.h"
|
||||
#include "mgmtShell.h"
|
||||
#include "mgmtUser.h"
|
||||
|
||||
#include "mgmtAcct.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtTable.h"
|
||||
|
||||
|
||||
void *tsUserSdb = NULL;
|
||||
static int32_t tsUserUpdateSize = 0;
|
||||
|
||||
void *(*mgmtUserActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
void *mgmtUserActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
void *mgmtUserActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
void *mgmtUserActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
void *mgmtUserActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
void *mgmtUserActionReset(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
|
||||
static int32_t mgmtDropUser(SAcctObj *pAcct, char *name);
|
||||
static int32_t mgmtUpdateUser(SUserObj *pUser);
|
||||
static int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
static SUserObj *mgmtGetUserFromConn(void *pConn);
|
||||
|
||||
void mgmtUserActionInit() {
|
||||
mgmtUserActionFp[SDB_TYPE_INSERT] = mgmtUserActionInsert;
|
||||
mgmtUserActionFp[SDB_TYPE_DELETE] = mgmtUserActionDelete;
|
||||
mgmtUserActionFp[SDB_TYPE_UPDATE] = mgmtUserActionUpdate;
|
||||
mgmtUserActionFp[SDB_TYPE_ENCODE] = mgmtUserActionEncode;
|
||||
mgmtUserActionFp[SDB_TYPE_DECODE] = mgmtUserActionDecode;
|
||||
mgmtUserActionFp[SDB_TYPE_RESET] = mgmtUserActionReset;
|
||||
mgmtUserActionFp[SDB_TYPE_DESTROY] = mgmtUserActionDestroy;
|
||||
}
|
||||
static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg);
|
||||
static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg);
|
||||
static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg);
|
||||
|
||||
void *mgmtUserAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
if (mgmtUserActionFp[(uint8_t) action] != NULL) {
|
||||
return (*(mgmtUserActionFp[(uint8_t) action]))(row, str, size, ssize);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
static void *(*mgmtUserActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
static void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
static void *mgmtUserActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
static void *mgmtUserActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
static void *mgmtUserActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
static void *mgmtUserActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
static void *mgmtUserActionReset(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
static void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
|
||||
static void *mgmtUserAction(char action, void *row, char *str, int32_t size, int32_t *ssize);
|
||||
static void mgmtUserActionInit();
|
||||
|
||||
int32_t mgmtInitUsers() {
|
||||
void *pNode = NULL;
|
||||
|
@ -87,19 +90,29 @@ int32_t mgmtInitUsers() {
|
|||
mgmtCreateUser(pAcct, "monitor", tsInternalPass);
|
||||
mgmtCreateUser(pAcct, "_root", tsInternalPass);
|
||||
|
||||
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CREATE_USER, mgmtProcessCreateUserMsg);
|
||||
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_ALTER_USER, mgmtProcessAlterUserMsg);
|
||||
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_DROP_USER, mgmtProcessDropUserMsg);
|
||||
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_USER, mgmtGetUserMeta);
|
||||
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_USER, mgmtRetrieveUsers);
|
||||
|
||||
mTrace("user data is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mgmtCleanUpUsers() {
|
||||
sdbCloseTable(tsUserSdb);
|
||||
}
|
||||
|
||||
SUserObj *mgmtGetUser(char *name) {
|
||||
return (SUserObj *)sdbGetRow(tsUserSdb, name);
|
||||
}
|
||||
|
||||
int32_t mgmtUpdateUser(SUserObj *pUser) {
|
||||
static int32_t mgmtUpdateUser(SUserObj *pUser) {
|
||||
return sdbUpdateRow(tsUserSdb, pUser, 0, 1);
|
||||
}
|
||||
|
||||
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
|
||||
static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
|
||||
int32_t code = mgmtCheckUserLimit(pAcct);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
|
@ -141,7 +154,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mgmtDropUser(SAcctObj *pAcct, char *name) {
|
||||
static int32_t mgmtDropUser(SAcctObj *pAcct, char *name) {
|
||||
SUserObj *pUser;
|
||||
|
||||
pUser = (SUserObj *)sdbGetRow(tsUserSdb, name);
|
||||
|
@ -159,11 +172,7 @@ int32_t mgmtDropUser(SAcctObj *pAcct, char *name) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void mgmtCleanUpUsers() {
|
||||
sdbCloseTable(tsUserSdb);
|
||||
}
|
||||
|
||||
int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
|
||||
static int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
|
||||
SUserObj *pUser = mgmtGetUserFromConn(pConn);
|
||||
if (pUser == NULL) {
|
||||
return TSDB_CODE_INVALID_USER;
|
||||
|
@ -206,7 +215,7 @@ int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
|
||||
static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
|
||||
int32_t numOfRows = 0;
|
||||
SUserObj *pUser = NULL;
|
||||
int32_t cols = 0;
|
||||
|
@ -243,7 +252,24 @@ int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
static void mgmtUserActionInit() {
|
||||
mgmtUserActionFp[SDB_TYPE_INSERT] = mgmtUserActionInsert;
|
||||
mgmtUserActionFp[SDB_TYPE_DELETE] = mgmtUserActionDelete;
|
||||
mgmtUserActionFp[SDB_TYPE_UPDATE] = mgmtUserActionUpdate;
|
||||
mgmtUserActionFp[SDB_TYPE_ENCODE] = mgmtUserActionEncode;
|
||||
mgmtUserActionFp[SDB_TYPE_DECODE] = mgmtUserActionDecode;
|
||||
mgmtUserActionFp[SDB_TYPE_RESET] = mgmtUserActionReset;
|
||||
mgmtUserActionFp[SDB_TYPE_DESTROY] = mgmtUserActionDestroy;
|
||||
}
|
||||
|
||||
static void *mgmtUserAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
if (mgmtUserActionFp[(uint8_t) action] != NULL) {
|
||||
return (*(mgmtUserActionFp[(uint8_t) action]))(row, str, size, ssize);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
SUserObj *pUser = (SUserObj *) row;
|
||||
SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
|
||||
|
||||
|
@ -253,7 +279,7 @@ void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void *mgmtUserActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
static void *mgmtUserActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
SUserObj *pUser = (SUserObj *) row;
|
||||
SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
|
||||
|
||||
|
@ -262,11 +288,11 @@ void *mgmtUserActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void *mgmtUserActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
static void *mgmtUserActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
return mgmtUserActionReset(row, str, size, ssize);
|
||||
}
|
||||
|
||||
void *mgmtUserActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
static void *mgmtUserActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
SUserObj *pUser = (SUserObj *) row;
|
||||
|
||||
if (size < tsUserUpdateSize) {
|
||||
|
@ -279,7 +305,7 @@ void *mgmtUserActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void *mgmtUserActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
static void *mgmtUserActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
SUserObj *pUser = (SUserObj *) malloc(sizeof(SUserObj));
|
||||
if (pUser == NULL) return NULL;
|
||||
memset(pUser, 0, sizeof(SUserObj));
|
||||
|
@ -289,7 +315,7 @@ void *mgmtUserActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
|
|||
return (void *)pUser;
|
||||
}
|
||||
|
||||
void *mgmtUserActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
static void *mgmtUserActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
SUserObj *pUser = (SUserObj *)row;
|
||||
|
||||
memcpy(pUser, str, tsUserUpdateSize);
|
||||
|
@ -297,15 +323,200 @@ void *mgmtUserActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
static void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
|
||||
tfree(row);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SUserObj *mgmtGetUserFromConn(void *pConn) {
|
||||
static SUserObj *mgmtGetUserFromConn(void *pConn) {
|
||||
SRpcConnInfo connInfo;
|
||||
rpcGetConnInfo(pConn, &connInfo);
|
||||
|
||||
return mgmtGetUser(connInfo.user);
|
||||
}
|
||||
|
||||
static void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) {
|
||||
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
if (mgmtCheckRedirect(rpcMsg->handle)) return;
|
||||
|
||||
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle);
|
||||
if (pUser == NULL) {
|
||||
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pUser->superAuth) {
|
||||
SCreateUserMsg *pCreate = rpcMsg->pCont;
|
||||
rpcRsp.code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass);
|
||||
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("user:%s is created by %s", pCreate->user, pUser->user);
|
||||
}
|
||||
} else {
|
||||
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
|
||||
}
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) {
|
||||
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
if (mgmtCheckRedirect(rpcMsg->handle)) return;
|
||||
|
||||
SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle);
|
||||
if (pOperUser == NULL) {
|
||||
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return;
|
||||
}
|
||||
|
||||
SAlterUserMsg *pAlter = rpcMsg->pCont;
|
||||
SUserObj *pUser = mgmtGetUser(pAlter->user);
|
||||
if (pUser == NULL) {
|
||||
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return;
|
||||
}
|
||||
|
||||
if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
|
||||
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return;
|
||||
}
|
||||
|
||||
if ((pAlter->flag & TSDB_ALTER_USER_PASSWD) != 0) {
|
||||
bool hasRight = false;
|
||||
if (strcmp(pOperUser->user, "root") == 0) {
|
||||
hasRight = true;
|
||||
} else if (strcmp(pUser->user, pOperUser->user) == 0) {
|
||||
hasRight = true;
|
||||
} else if (pOperUser->superAuth) {
|
||||
if (strcmp(pUser->user, "root") == 0) {
|
||||
hasRight = false;
|
||||
} else if (strcmp(pOperUser->acct, pUser->acct) != 0) {
|
||||
hasRight = false;
|
||||
} else {
|
||||
hasRight = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (hasRight) {
|
||||
memset(pUser->pass, 0, sizeof(pUser->pass));
|
||||
taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass);
|
||||
rpcRsp.code = mgmtUpdateUser(pUser);
|
||||
mLPrint("user:%s password is altered by %s, code:%d", pAlter->user, pUser->user, rpcRsp.code);
|
||||
} else {
|
||||
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
|
||||
}
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return;
|
||||
}
|
||||
|
||||
if ((pAlter->flag & TSDB_ALTER_USER_PRIVILEGES) != 0) {
|
||||
bool hasRight = false;
|
||||
|
||||
if (strcmp(pUser->user, "root") == 0) {
|
||||
hasRight = false;
|
||||
} else if (strcmp(pUser->user, pUser->acct) == 0) {
|
||||
hasRight = false;
|
||||
} else if (strcmp(pOperUser->user, "root") == 0) {
|
||||
hasRight = true;
|
||||
} else if (strcmp(pUser->user, pOperUser->user) == 0) {
|
||||
hasRight = false;
|
||||
} else if (pOperUser->superAuth) {
|
||||
if (strcmp(pUser->user, "root") == 0) {
|
||||
hasRight = false;
|
||||
} else if (strcmp(pOperUser->acct, pUser->acct) != 0) {
|
||||
hasRight = false;
|
||||
} else {
|
||||
hasRight = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (pAlter->privilege == 1) { // super
|
||||
hasRight = false;
|
||||
}
|
||||
|
||||
if (hasRight) {
|
||||
//if (pAlter->privilege == 1) { // super
|
||||
// pUser->superAuth = 1;
|
||||
// pUser->writeAuth = 1;
|
||||
//}
|
||||
if (pAlter->privilege == 2) { // read
|
||||
pUser->superAuth = 0;
|
||||
pUser->writeAuth = 0;
|
||||
}
|
||||
if (pAlter->privilege == 3) { // write
|
||||
pUser->superAuth = 0;
|
||||
pUser->writeAuth = 1;
|
||||
}
|
||||
|
||||
rpcRsp.code = mgmtUpdateUser(pUser);
|
||||
mLPrint("user:%s privilege is altered by %s, code:%d", pAlter->user, pUser->user, rpcRsp.code);
|
||||
} else {
|
||||
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
|
||||
}
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return;
|
||||
}
|
||||
|
||||
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) {
|
||||
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
if (mgmtCheckRedirect(rpcMsg->handle)) return;
|
||||
|
||||
SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle);
|
||||
if (pOperUser == NULL) {
|
||||
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return ;
|
||||
}
|
||||
|
||||
SDropUserMsg *pDrop = rpcMsg->pCont;
|
||||
SUserObj *pUser = mgmtGetUser(pDrop->user);
|
||||
if (pUser == NULL) {
|
||||
rpcRsp.code = TSDB_CODE_INVALID_USER;
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return ;
|
||||
}
|
||||
|
||||
if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
|
||||
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
|
||||
rpcSendResponse(&rpcRsp);
|
||||
return ;
|
||||
}
|
||||
|
||||
bool hasRight = false;
|
||||
if (strcmp(pUser->user, "root") == 0) {
|
||||
hasRight = false;
|
||||
} else if (strcmp(pOperUser->user, "root") == 0) {
|
||||
hasRight = true;
|
||||
} else if (strcmp(pUser->user, pOperUser->user) == 0) {
|
||||
hasRight = false;
|
||||
} else if (pOperUser->superAuth) {
|
||||
if (strcmp(pUser->user, "root") == 0) {
|
||||
hasRight = false;
|
||||
} else if (strcmp(pOperUser->acct, pUser->acct) != 0) {
|
||||
hasRight = false;
|
||||
} else {
|
||||
hasRight = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (hasRight) {
|
||||
rpcRsp.code = mgmtDropUser(pUser->pAcct, pDrop->user);
|
||||
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("user:%s is dropped by %s", pDrop->user, pUser->user);
|
||||
}
|
||||
} else {
|
||||
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
|
||||
}
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@
|
|||
#include "mgmtBalance.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDnode.h"
|
||||
#include "mgmtDnodeInt.h"
|
||||
#include "mgmtTable.h"
|
||||
#include "mgmtVgroup.h"
|
||||
|
||||
|
@ -193,7 +192,7 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
|
|||
|
||||
mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
||||
|
||||
mgmtSendRemoveVgroupMsg(pVgroup, NULL);
|
||||
mgmtSendDropVgroupMsg(pVgroup, NULL);
|
||||
|
||||
sdbDeleteRow(tsVgroupSdb, pVgroup);
|
||||
|
||||
|
@ -503,11 +502,11 @@ void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) {
|
|||
taosFreeId(pVgroup->idPool, pTable->sid);
|
||||
}
|
||||
|
||||
SCreateVnodeMsg *mgmtBuildVpeersMsg(SVgObj *pVgroup, int32_t vnode) {
|
||||
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) {
|
||||
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
|
||||
if (pDb == NULL) return NULL;
|
||||
|
||||
SCreateVnodeMsg *pVPeers = rpcMallocCont(sizeof(SCreateVnodeMsg));
|
||||
SMDCreateVnodeMsg *pVPeers = rpcMallocCont(sizeof(SMDCreateVnodeMsg));
|
||||
if (pVPeers == NULL) return NULL;
|
||||
|
||||
pVPeers->vnode = htonl(vnode);
|
||||
|
|
Loading…
Reference in New Issue