TD-10431 create dnode
This commit is contained in:
parent
47e1d6954d
commit
23da7d4230
|
@ -829,7 +829,16 @@ typedef struct SShowRsp {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char ep[TSDB_EP_LEN]; // end point, hostname:port
|
char ep[TSDB_EP_LEN]; // end point, hostname:port
|
||||||
} SCreateDnodeMsg, SDropDnodeMsg;
|
} SCreateDnodeMsg;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t dnodeId;
|
||||||
|
} SDropDnodeMsg;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t dnodeId;
|
||||||
|
char config[128];
|
||||||
|
} SCfgDnodeMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
|
@ -849,11 +858,6 @@ typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
} SConfigVnodeMsg;
|
} SConfigVnodeMsg;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
char ep[TSDB_EP_LEN]; // end point, hostname:port
|
|
||||||
char config[64];
|
|
||||||
} SCfgDnodeMsg;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char sql[TSDB_SHOW_SQL_LEN];
|
char sql[TSDB_SHOW_SQL_LEN];
|
||||||
int32_t queryId;
|
int32_t queryId;
|
||||||
|
|
|
@ -196,7 +196,6 @@ extern SDiskCfg tsDiskCfg[];
|
||||||
void taosInitGlobalCfg();
|
void taosInitGlobalCfg();
|
||||||
int32_t taosCheckGlobalCfg();
|
int32_t taosCheckGlobalCfg();
|
||||||
int32_t taosCfgDynamicOptions(char *msg);
|
int32_t taosCfgDynamicOptions(char *msg);
|
||||||
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
|
|
||||||
bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId);
|
bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId);
|
||||||
void taosAddDataDir(int index, char *v1, int level, int primary);
|
void taosAddDataDir(int index, char *v1, int level, int primary);
|
||||||
void taosReadDataDirCfg(char *v1, char *v2, char *v3);
|
void taosReadDataDirCfg(char *v1, char *v2, char *v3);
|
||||||
|
|
|
@ -21,22 +21,22 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tmd5.h"
|
|
||||||
#include "tcrc32c.h"
|
#include "tcrc32c.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
|
#include "tmd5.h"
|
||||||
|
|
||||||
int32_t strdequote(char *src);
|
int32_t strdequote(char *src);
|
||||||
int32_t strndequote(char *dst, const char* z, int32_t len);
|
int32_t strndequote(char *dst, const char *z, int32_t len);
|
||||||
int32_t strRmquote(char *z, int32_t len);
|
int32_t strRmquote(char *z, int32_t len);
|
||||||
size_t strtrim(char *src);
|
size_t strtrim(char *src);
|
||||||
char * strnchr(char *haystack, char needle, int32_t len, bool skipquote);
|
char *strnchr(char *haystack, char needle, int32_t len, bool skipquote);
|
||||||
char ** strsplit(char *src, const char *delim, int32_t *num);
|
char **strsplit(char *src, const char *delim, int32_t *num);
|
||||||
char * strtolower(char *dst, const char *src);
|
char *strtolower(char *dst, const char *src);
|
||||||
char * strntolower(char *dst, const char *src, int32_t n);
|
char *strntolower(char *dst, const char *src, int32_t n);
|
||||||
char * strntolower_s(char *dst, const char *src, int32_t n);
|
char *strntolower_s(char *dst, const char *src, int32_t n);
|
||||||
int64_t strnatoi(char *num, int32_t len);
|
int64_t strnatoi(char *num, int32_t len);
|
||||||
char * strbetween(char *string, char *begin, char *end);
|
char *strbetween(char *string, char *begin, char *end);
|
||||||
char * paGetToken(char *src, char **token, int32_t *tokenLen);
|
char *paGetToken(char *src, char **token, int32_t *tokenLen);
|
||||||
|
|
||||||
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]);
|
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]);
|
||||||
int32_t taosHexStrToByteArray(char hexstr[], char bytes[]);
|
int32_t taosHexStrToByteArray(char hexstr[], char bytes[]);
|
||||||
|
@ -45,11 +45,12 @@ char *taosIpStr(uint32_t ipInt);
|
||||||
uint32_t ip2uint(const char *const ip_addr);
|
uint32_t ip2uint(const char *const ip_addr);
|
||||||
void taosIp2String(uint32_t ip, char *str);
|
void taosIp2String(uint32_t ip, char *str);
|
||||||
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
|
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
|
||||||
|
int32_t taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
|
||||||
|
|
||||||
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
|
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
|
||||||
MD5_CTX context;
|
MD5_CTX context;
|
||||||
MD5Init(&context);
|
MD5Init(&context);
|
||||||
MD5Update(&context, inBuf, (unsigned int)inLen);
|
MD5Update(&context, inBuf, (uint32_t)inLen);
|
||||||
MD5Final(&context);
|
MD5Final(&context);
|
||||||
memcpy(target, context.digest, TSDB_KEY_LEN);
|
memcpy(target, context.digest, TSDB_KEY_LEN);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1738,24 +1738,6 @@ int32_t taosCheckGlobalCfg() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) {
|
|
||||||
*port = 0;
|
|
||||||
strcpy(fqdn, ep);
|
|
||||||
|
|
||||||
char *temp = strchr(fqdn, ':');
|
|
||||||
if (temp) {
|
|
||||||
*temp = 0;
|
|
||||||
*port = atoi(temp+1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*port == 0) {
|
|
||||||
*port = tsServerPort;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* alter dnode 1 balance "vnode:1-dnode:2"
|
* alter dnode 1 balance "vnode:1-dnode:2"
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -286,18 +286,16 @@ typedef struct SFuncObj {
|
||||||
char pData[];
|
char pData[];
|
||||||
} SFuncObj;
|
} SFuncObj;
|
||||||
|
|
||||||
typedef struct SShowObj SShowObj;
|
typedef struct {
|
||||||
typedef struct SShowObj {
|
|
||||||
int8_t type;
|
|
||||||
int8_t maxReplica;
|
|
||||||
int16_t numOfColumns;
|
|
||||||
int32_t id;
|
int32_t id;
|
||||||
|
int8_t type;
|
||||||
|
int8_t replica;
|
||||||
|
int16_t numOfColumns;
|
||||||
int32_t rowSize;
|
int32_t rowSize;
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
int32_t numOfReads;
|
int32_t numOfReads;
|
||||||
uint16_t payloadLen;
|
int32_t payloadLen;
|
||||||
void *pIter;
|
void *pIter;
|
||||||
void *pVgIter;
|
|
||||||
SMnode *pMnode;
|
SMnode *pMnode;
|
||||||
char db[TSDB_FULL_DB_NAME_LEN];
|
char db[TSDB_FULL_DB_NAME_LEN];
|
||||||
int16_t offset[TSDB_MAX_COLUMNS];
|
int16_t offset[TSDB_MAX_COLUMNS];
|
||||||
|
|
|
@ -26,6 +26,7 @@ int32_t mndInitDnode(SMnode *pMnode);
|
||||||
void mndCleanupDnode(SMnode *pMnode);
|
void mndCleanupDnode(SMnode *pMnode);
|
||||||
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId);
|
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId);
|
||||||
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
|
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
|
||||||
|
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp);
|
||||||
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
|
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
|
||||||
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp);
|
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp);
|
||||||
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);
|
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);
|
||||||
|
char *mndShowStr(int32_t showType);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ extern "C" {
|
||||||
|
|
||||||
int32_t mndInitVgroup(SMnode *pMnode);
|
int32_t mndInitVgroup(SMnode *pMnode);
|
||||||
void mndCleanupVgroup(SMnode *pMnode);
|
void mndCleanupVgroup(SMnode *pMnode);
|
||||||
|
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,12 +16,17 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
#define SDB_DNODE_VER 1
|
#define TSDB_DNODE_VER 1
|
||||||
|
#define TSDB_CONFIG_OPTION_LEN 16
|
||||||
|
#define TSDB_CONIIG_VALUE_LEN 48
|
||||||
|
#define TSDB_CONFIG_NUMBER 8
|
||||||
|
|
||||||
static char *offlineReason[] = {
|
static const char *offlineReason[] = {
|
||||||
"",
|
"",
|
||||||
"status msg timeout",
|
"status msg timeout",
|
||||||
"status not received",
|
"status not received",
|
||||||
|
@ -36,17 +41,28 @@ static char *offlineReason[] = {
|
||||||
"unknown",
|
"unknown",
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static const char *dnodeStatus[] = {"offline", "ready", "creating", "dropping"};
|
||||||
|
|
||||||
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
|
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
|
||||||
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
|
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
|
||||||
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
|
||||||
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
|
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
|
||||||
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
|
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
|
||||||
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode);
|
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode);
|
||||||
|
|
||||||
static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg);
|
||||||
|
static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg);
|
||||||
|
|
||||||
|
static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
|
||||||
|
static int32_t mndRetrieveConfigs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
||||||
|
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
|
||||||
|
static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
|
||||||
|
static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
||||||
|
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitDnode(SMnode *pMnode) {
|
int32_t mndInitDnode(SMnode *pMnode) {
|
||||||
SSdbTable table = {.sdbType = SDB_DNODE,
|
SSdbTable table = {.sdbType = SDB_DNODE,
|
||||||
.keyType = SDB_KEY_INT32,
|
.keyType = SDB_KEY_INT32,
|
||||||
|
@ -60,8 +76,16 @@ int32_t mndInitDnode(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg);
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg);
|
||||||
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg);
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg);
|
||||||
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg);
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg);
|
||||||
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP, mndProcessConfigDnodeRsp);
|
||||||
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS, mndProcessStatusMsg);
|
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS, mndProcessStatusMsg);
|
||||||
|
|
||||||
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndGetConfigMeta);
|
||||||
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndRetrieveConfigs);
|
||||||
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndCancelGetNextConfig);
|
||||||
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndGetDnodeMeta);
|
||||||
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
|
||||||
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,7 +108,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
|
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, SDB_DNODE_VER, sizeof(SDnodeObj));
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER, sizeof(SDnodeObj));
|
||||||
if (pRaw == NULL) return NULL;
|
if (pRaw == NULL) return NULL;
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
|
@ -102,7 +126,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
|
||||||
int8_t sver = 0;
|
int8_t sver = 0;
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
|
||||||
|
|
||||||
if (sver != SDB_DNODE_VER) {
|
if (sver != TSDB_DNODE_VER) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
mError("failed to decode dnode since %s", terrstr());
|
mError("failed to decode dnode since %s", terrstr());
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -168,6 +192,12 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
|
||||||
sdbRelease(pSdb, pDnode);
|
sdbRelease(pSdb, pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
|
||||||
|
SEpSet epSet = {.inUse = 0, .numOfEps = 1, .port[0] = pDnode->port};
|
||||||
|
memcpy(epSet.fqdn[0], pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
return epSet;
|
||||||
|
}
|
||||||
|
|
||||||
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
|
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
@ -358,8 +388,384 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { return 0; }
|
static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) {
|
||||||
|
SDnodeObj dnodeObj = {0};
|
||||||
|
dnodeObj.id = 1; // todo
|
||||||
|
dnodeObj.createdTime = taosGetTimestampMs();
|
||||||
|
dnodeObj.updateTime = dnodeObj.createdTime;
|
||||||
|
taosGetFqdnPortFromEp(pCreate->ep, dnodeObj.fqdn, &dnodeObj.port);
|
||||||
|
|
||||||
static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) { return 0; }
|
if (dnodeObj.fqdn[0] == 0 || dnodeObj.port <= 0) {
|
||||||
|
terrno = TSDB_CODE_SDB_APP_ERROR;
|
||||||
|
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) { return 0; }
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mDebug("trans:%d, used to create dnode:%s", pTrans->id, pCreate->ep);
|
||||||
|
|
||||||
|
SSdbRaw *pRedoRaw = mndDnodeActionEncode(&dnodeObj);
|
||||||
|
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
|
||||||
|
|
||||||
|
SSdbRaw *pUndoRaw = mndDnodeActionEncode(&dnodeObj);
|
||||||
|
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
|
||||||
|
|
||||||
|
SSdbRaw *pCommitRaw = mndDnodeActionEncode(&dnodeObj);
|
||||||
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
|
if (mndTransPrepare(pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
|
||||||
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
|
SCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||||
|
|
||||||
|
mDebug("dnode:%s, start to create", pCreate->ep);
|
||||||
|
|
||||||
|
if (pCreate->ep[0] == 0) {
|
||||||
|
terrno = TSDB_CODE_SDB_APP_ERROR;
|
||||||
|
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDnodeObj *pDnode = mndAcquireDnodeByEp(pMnode, pCreate->ep);
|
||||||
|
if (pDnode != NULL) {
|
||||||
|
mError("dnode:%d, already exist", pDnode->id);
|
||||||
|
sdbRelease(pMnode->pSdb, pDnode);
|
||||||
|
terrno = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = mndCreateDnode(pMnode, pMsg, pCreate);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode) {
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mDebug("trans:%d, used to drop user:%d", pTrans->id, pDnode->id);
|
||||||
|
|
||||||
|
SSdbRaw *pRedoRaw = mndDnodeActionEncode(pDnode);
|
||||||
|
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
|
||||||
|
|
||||||
|
SSdbRaw *pUndoRaw = mndDnodeActionEncode(pDnode);
|
||||||
|
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
|
SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode);
|
||||||
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||||
|
|
||||||
|
if (mndTransPrepare(pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) {
|
||||||
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
|
SDropDnodeMsg *pDrop = pMsg->rpcMsg.pCont;
|
||||||
|
pDrop->dnodeId = htonl(pDrop->dnodeId);
|
||||||
|
|
||||||
|
mDebug("dnode:%d, start to drop", pDrop->dnodeId);
|
||||||
|
|
||||||
|
if (pDrop->dnodeId <= 0) {
|
||||||
|
terrno = TSDB_CODE_SDB_APP_ERROR;
|
||||||
|
mError("dnode:%d, failed to create since %s", pDrop->dnodeId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDrop->dnodeId);
|
||||||
|
if (pDnode == NULL) {
|
||||||
|
mError("dnode:%d, not exist", pDrop->dnodeId);
|
||||||
|
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = mndDropDnode(pMnode, pMsg, pDnode);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
mError("dnode:%d, failed to create since %s", pDrop->dnodeId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pMnode->pSdb, pDnode);
|
||||||
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) {
|
||||||
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
|
SCfgDnodeMsg *pCfg = pMsg->rpcMsg.pCont;
|
||||||
|
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||||
|
|
||||||
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCfg->dnodeId);
|
||||||
|
if (pDnode == NULL) {
|
||||||
|
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||||
|
mError("dnode:%d, failed to cfg since %s ", pCfg->dnodeId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SEpSet epSet = mndGetDnodeEpset(pDnode);
|
||||||
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
|
SCfgDnodeMsg *pCfgDnode = rpcMallocCont(sizeof(SCfgDnodeMsg));
|
||||||
|
pCfgDnode->dnodeId = htonl(pCfg->dnodeId);
|
||||||
|
memcpy(pCfgDnode->config, pCfg->config, 128);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_CONFIG_DNODE_IN, .pCont = pCfgDnode, .contLen = sizeof(SCfgDnodeMsg)};
|
||||||
|
|
||||||
|
mInfo("dnode:%d, is configured", pCfg->dnodeId);
|
||||||
|
mndSendMsgToDnode(pMnode, &epSet, &rpcMsg);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pMsg) { mInfo("cfg dnode rsp is received"); }
|
||||||
|
|
||||||
|
static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
|
||||||
|
int32_t cols = 0;
|
||||||
|
SSchema *pSchema = pMeta->schema;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
tstrncpy(pSchema[cols].name, "name", sizeof(pSchema[cols].name));
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
tstrncpy(pSchema[cols].name, "value", sizeof(pSchema[cols].name));
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pMeta->numOfColumns = htons(cols);
|
||||||
|
pShow->numOfColumns = cols;
|
||||||
|
|
||||||
|
pShow->offset[0] = 0;
|
||||||
|
for (int32_t i = 1; i < cols; ++i) {
|
||||||
|
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
pShow->numOfRows = TSDB_CONFIG_NUMBER;
|
||||||
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
|
pShow->pIter = NULL;
|
||||||
|
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndRetrieveConfigs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
char *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
|
||||||
|
char cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONIIG_VALUE_LEN + 1] = {0};
|
||||||
|
char *pWrite;
|
||||||
|
int32_t cols = 0;
|
||||||
|
|
||||||
|
cfgOpts[numOfRows] = "statusInterval";
|
||||||
|
snprintf(cfgVals[numOfRows], TSDB_CONIIG_VALUE_LEN, "%d", pMnode->cfg.statusInterval);
|
||||||
|
numOfRows++;
|
||||||
|
|
||||||
|
cfgOpts[numOfRows] = "timezone";
|
||||||
|
snprintf(cfgVals[numOfRows], TSDB_CONIIG_VALUE_LEN, "%s", pMnode->cfg.timezone);
|
||||||
|
numOfRows++;
|
||||||
|
|
||||||
|
cfgOpts[numOfRows] = "locale";
|
||||||
|
snprintf(cfgVals[numOfRows], TSDB_CONIIG_VALUE_LEN, "%s", pMnode->cfg.locale);
|
||||||
|
numOfRows++;
|
||||||
|
|
||||||
|
cfgOpts[numOfRows] = "charset";
|
||||||
|
snprintf(cfgVals[numOfRows], TSDB_CONIIG_VALUE_LEN, "%s", pMnode->cfg.charset);
|
||||||
|
numOfRows++;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfRows; i++) {
|
||||||
|
cols = 0;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, cfgVals[i], TSDB_CONIIG_VALUE_LEN);
|
||||||
|
cols++;
|
||||||
|
}
|
||||||
|
|
||||||
|
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
||||||
|
pShow->numOfReads += numOfRows;
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
|
||||||
|
|
||||||
|
static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
|
||||||
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
int32_t cols = 0;
|
||||||
|
SSchema *pSchema = pMeta->schema;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = 2;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||||
|
strcpy(pSchema[cols].name, "id");
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
strcpy(pSchema[cols].name, "end point");
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = 2;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||||
|
strcpy(pSchema[cols].name, "vnodes");
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = 2;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||||
|
strcpy(pSchema[cols].name, "cores");
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
strcpy(pSchema[cols].name, "status");
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = 8;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
strcpy(pSchema[cols].name, "create time");
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
strcpy(pSchema[cols].name, "offline reason");
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pMeta->numOfColumns = htons(cols);
|
||||||
|
pShow->numOfColumns = cols;
|
||||||
|
|
||||||
|
pShow->offset[0] = 0;
|
||||||
|
for (int32_t i = 1; i < cols; ++i) {
|
||||||
|
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE);
|
||||||
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
|
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
int32_t cols = 0;
|
||||||
|
SDnodeObj *pDnode = NULL;
|
||||||
|
char *pWrite;
|
||||||
|
|
||||||
|
while (numOfRows < rows) {
|
||||||
|
pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode);
|
||||||
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
|
cols = 0;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
*(int16_t *)pWrite = pDnode->id;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->ep, pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
*(int16_t *)pWrite = pDnode->numOfVnodes;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
*(int16_t *)pWrite = pDnode->numOfCores;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
const char *status = dnodeStatus[pDnode->status];
|
||||||
|
STR_TO_VARSTR(pWrite, status);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
*(int64_t *)pWrite = pDnode->createdTime;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
STR_TO_VARSTR(pWrite, offlineReason[pDnode->offlineReason]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
numOfRows++;
|
||||||
|
sdbRelease(pSdb, pDnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
||||||
|
pShow->numOfReads += numOfRows;
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
}
|
|
@ -20,7 +20,6 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg);
|
||||||
static void mndFreeShowObj(SShowObj *pShow);
|
static void mndFreeShowObj(SShowObj *pShow);
|
||||||
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId);
|
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId);
|
||||||
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
|
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
|
||||||
static char *mndShowStr(int32_t showType);
|
|
||||||
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg);
|
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg);
|
||||||
static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMsg);
|
||||||
static bool mndCheckRetrieveFinished(SShowObj *pShow);
|
static bool mndCheckRetrieveFinished(SShowObj *pShow);
|
||||||
|
@ -88,10 +87,6 @@ static void mndFreeShowObj(SShowObj *pShow) {
|
||||||
|
|
||||||
ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
|
ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
|
||||||
if (freeFp != NULL) {
|
if (freeFp != NULL) {
|
||||||
if (pShow->pVgIter != NULL) {
|
|
||||||
// only used in 'show vnodes "ep"'
|
|
||||||
(*freeFp)(pMnode, pShow->pVgIter);
|
|
||||||
}
|
|
||||||
if (pShow->pIter != NULL) {
|
if (pShow->pIter != NULL) {
|
||||||
(*freeFp)(pMnode, pShow->pIter);
|
(*freeFp)(pMnode, pShow->pIter);
|
||||||
}
|
}
|
||||||
|
@ -259,7 +254,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static char *mndShowStr(int32_t showType) {
|
char *mndShowStr(int32_t showType) {
|
||||||
switch (showType) {
|
switch (showType) {
|
||||||
case TSDB_MGMT_TABLE_ACCT:
|
case TSDB_MGMT_TABLE_ACCT:
|
||||||
return "show accounts";
|
return "show accounts";
|
||||||
|
|
|
@ -277,7 +277,7 @@ static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewU
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropUser(SMnode *pMnode, SUserObj *pUser, SMnodeMsg *pMsg) {
|
static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) {
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
|
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
|
||||||
|
@ -437,7 +437,7 @@ static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mndDropUser(pMnode, pUser, pMsg);
|
int32_t code = mndDropUser(pMnode, pMsg, pUser);
|
||||||
sdbRelease(pMnode->pSdb, pOperUser);
|
sdbRelease(pMnode->pSdb, pOperUser);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -489,7 +489,8 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p
|
||||||
|
|
||||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_USER);
|
pShow->numOfRows = sdbGetSize(pSdb, SDB_USER);
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
strcpy(pMeta->tableFname, "show users");
|
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,8 +14,114 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "mndVgroup.h"
|
||||||
#include "mndInt.h"
|
#include "mndDnode.h"
|
||||||
|
#include "mndShow.h"
|
||||||
|
#include "mndTrans.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
|
||||||
|
static char *syncRole[] = {"unsynced", "slave", "master"};
|
||||||
|
|
||||||
|
static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
|
||||||
|
static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
||||||
|
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
int32_t mndInitVgroup(SMnode *pMnode) {
|
||||||
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndGetVnodeMeta);
|
||||||
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
|
||||||
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndInitVgroup(SMnode *pMnode) { return 0; }
|
|
||||||
void mndCleanupVgroup(SMnode *pMnode) {}
|
void mndCleanupVgroup(SMnode *pMnode) {}
|
||||||
|
|
||||||
|
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
|
||||||
|
if (dnodeId == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
|
||||||
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
int32_t cols = 0;
|
||||||
|
SSchema *pSchema = pMeta->schema;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = 4;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||||
|
strcpy(pSchema[cols].name, "vgId");
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
strcpy(pSchema[cols].name, "status");
|
||||||
|
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pMeta->numOfColumns = htons(cols);
|
||||||
|
pShow->numOfColumns = cols;
|
||||||
|
|
||||||
|
pShow->offset[0] = 0;
|
||||||
|
for (int32_t i = 1; i < cols; ++i) {
|
||||||
|
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dnodeId = 0;
|
||||||
|
if (pShow->payloadLen > 0) {
|
||||||
|
dnodeId = atoi(pShow->payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
pShow->replica = dnodeId;
|
||||||
|
pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
|
||||||
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
|
strcpy(pMeta->tableFname, mndShowStr(pShow->type));
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
char *pWrite;
|
||||||
|
int32_t cols = 0;
|
||||||
|
int32_t dnodeId = pShow->replica;
|
||||||
|
|
||||||
|
while (numOfRows < rows) {
|
||||||
|
pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
|
||||||
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pVgroup->numOfVnodes && numOfRows < rows; ++i) {
|
||||||
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||||
|
if (pVgid->dnodeId != dnodeId) continue;
|
||||||
|
|
||||||
|
cols = 0;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
*(uint32_t *)pWrite = pVgroup->vgId;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
STR_TO_VARSTR(pWrite, syncRole[pVgid->role]);
|
||||||
|
cols++;
|
||||||
|
numOfRows++;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
||||||
|
pShow->numOfReads += numOfRows;
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
}
|
|
@ -187,7 +187,7 @@ char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char* strtolower(char *dst, const char *src) {
|
char* strtolower(char *dst, const char *src) {
|
||||||
int esc = 0;
|
int32_t esc = 0;
|
||||||
char quote = 0, *p = dst, c;
|
char quote = 0, *p = dst, c;
|
||||||
|
|
||||||
assert(dst != NULL);
|
assert(dst != NULL);
|
||||||
|
@ -214,7 +214,7 @@ char* strtolower(char *dst, const char *src) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char* strntolower(char *dst, const char *src, int32_t n) {
|
char* strntolower(char *dst, const char *src, int32_t n) {
|
||||||
int esc = 0;
|
int32_t esc = 0;
|
||||||
char quote = 0, *p = dst, c;
|
char quote = 0, *p = dst, c;
|
||||||
|
|
||||||
assert(dst != NULL);
|
assert(dst != NULL);
|
||||||
|
@ -347,7 +347,7 @@ char *strbetween(char *string, char *begin, char *end) {
|
||||||
char *_begin = strstr(string, begin);
|
char *_begin = strstr(string, begin);
|
||||||
if (_begin != NULL) {
|
if (_begin != NULL) {
|
||||||
char *_end = strstr(_begin + strlen(begin), end);
|
char *_end = strstr(_begin + strlen(begin), end);
|
||||||
int size = (int)(_end - _begin);
|
int32_t size = (int32_t)(_end - _begin);
|
||||||
if (_end != NULL && size > 0) {
|
if (_end != NULL && size > 0) {
|
||||||
result = (char *)calloc(1, size);
|
result = (char *)calloc(1, size);
|
||||||
memcpy(result, _begin + strlen(begin), size - +strlen(begin));
|
memcpy(result, _begin + strlen(begin), size - +strlen(begin));
|
||||||
|
@ -402,7 +402,7 @@ int32_t taosHexStrToByteArray(char hexstr[], char bytes[]) {
|
||||||
|
|
||||||
char *taosIpStr(uint32_t ipInt) {
|
char *taosIpStr(uint32_t ipInt) {
|
||||||
static char ipStrArray[3][30];
|
static char ipStrArray[3][30];
|
||||||
static int ipStrIndex = 0;
|
static int32_t ipStrIndex = 0;
|
||||||
|
|
||||||
char *ipStr = ipStrArray[(ipStrIndex++) % 3];
|
char *ipStr = ipStrArray[(ipStrIndex++) % 3];
|
||||||
//sprintf(ipStr, "0x%x:%u.%u.%u.%u", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
|
//sprintf(ipStr, "0x%x:%u.%u.%u.%u", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
|
||||||
|
@ -417,3 +417,16 @@ void taosIp2String(uint32_t ip, char *str) {
|
||||||
void taosIpPort2String(uint32_t ip, uint16_t port, char *str) {
|
void taosIpPort2String(uint32_t ip, uint16_t port, char *str) {
|
||||||
sprintf(str, "%u.%u.%u.%u:%u", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, (uint8_t)(ip >> 24), port);
|
sprintf(str, "%u.%u.%u.%u:%u", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, (uint8_t)(ip >> 24), port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) {
|
||||||
|
*port = 0;
|
||||||
|
strcpy(fqdn, ep);
|
||||||
|
|
||||||
|
char *temp = strchr(fqdn, ':');
|
||||||
|
if (temp) {
|
||||||
|
*temp = 0;
|
||||||
|
*port = atoi(temp + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue