TD-10431 handle status msg

This commit is contained in:
Shengliang Guan 2021-11-30 19:42:51 +08:00
parent 9c57ee0588
commit bb887daf81
14 changed files with 429 additions and 242 deletions

View File

@ -63,6 +63,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_USER, "drop-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DNODE, "create-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DNODE, "create-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DNODE, "drop-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DNODE, "drop-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE, "drop-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DB, "create-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DB, "create-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DB, "drop-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DB, "drop-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_USE_DB, "use-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_USE_DB, "use-db" )
@ -631,7 +633,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t statusInterval; int32_t statusInterval;
int8_t reserved[4]; int32_t mnodeEqualVnodeNum;
int64_t checkTime; // 1970-01-01 00:00:00.000 int64_t checkTime; // 1970-01-01 00:00:00.000
char timezone[TSDB_TIMEZONE_LEN]; // tsTimezone char timezone[TSDB_TIMEZONE_LEN]; // tsTimezone
char locale[TSDB_LOCALE_LEN]; // tsLocale char locale[TSDB_LOCALE_LEN]; // tsLocale
@ -654,11 +656,14 @@ typedef struct {
} SVnodeLoads; } SVnodeLoads;
typedef struct SStatusMsg { typedef struct SStatusMsg {
uint32_t sversion; int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
uint32_t rebootTime; // time stamp for last reboot uint32_t rebootTime; // time stamp for last reboot
int32_t numOfCores; int16_t numOfCores;
int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes;
int16_t numOfSupportQnodes;
char dnodeEp[TSDB_EP_LEN]; char dnodeEp[TSDB_EP_LEN];
SClusterCfg clusterCfg; SClusterCfg clusterCfg;
SVnodeLoads vnodeLoads; SVnodeLoads vnodeLoads;

View File

@ -26,94 +26,24 @@ extern "C" {
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef struct { typedef struct {
/**
* @brief software version of the program.
*
*/
int32_t sver; int32_t sver;
int16_t numOfCores;
/** int16_t numOfSupportMnodes;
* @brief num of CPU cores. int16_t numOfSupportVnodes;
* int16_t numOfSupportQnodes;
*/
int32_t numOfCores;
/**
* @brief number of threads per CPU core.
*
*/
float numOfThreadsPerCore;
/**
* @brief the proportion of total CPU cores available for query processing.
*
*/
float ratioOfQueryCores;
/**
* @brief max number of connections allowed in dnode.
*
*/
int32_t maxShellConns;
/**
* @brief time interval of heart beat from shell to dnode, seconds.
*
*/
int32_t shellActivityTimer;
/**
* @brief time interval of dnode status reporting to mnode, seconds, for cluster only.
*
*/
int32_t statusInterval; int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
/** float numOfThreadsPerCore;
* @brief first port number for the connection (12 continuous UDP/TCP port number are used). float ratioOfQueryCores;
* int32_t maxShellConns;
*/ int32_t shellActivityTimer;
uint16_t serverPort; uint16_t serverPort;
/**
* @brief data file's directory.
*
*/
char dataDir[TSDB_FILENAME_LEN]; char dataDir[TSDB_FILENAME_LEN];
/**
* @brief local endpoint.
*
*/
char localEp[TSDB_EP_LEN]; char localEp[TSDB_EP_LEN];
/**
* @brieflocal fully qualified domain name (FQDN).
*
*/
char localFqdn[TSDB_FQDN_LEN]; char localFqdn[TSDB_FQDN_LEN];
/**
* @brief first fully qualified domain name (FQDN) for TDengine system.
*
*/
char firstEp[TSDB_EP_LEN]; char firstEp[TSDB_EP_LEN];
/**
* @brief system time zone.
*
*/
char timezone[TSDB_TIMEZONE_LEN]; char timezone[TSDB_TIMEZONE_LEN];
/**
* @brief system locale.
*
*/
char locale[TSDB_LOCALE_LEN]; char locale[TSDB_LOCALE_LEN];
/**
* @briefdefault system charset.
*
*/
char charset[TSDB_LOCALE_LEN]; char charset[TSDB_LOCALE_LEN];
} SDnodeOpt; } SDnodeOpt;

View File

@ -30,133 +30,36 @@ typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg); typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg);
typedef struct SMnodeLoad { typedef struct SMnodeLoad {
/**
* @brief the number of dnodes in cluster.
*
*/
int64_t numOfDnode; int64_t numOfDnode;
/**
* @brief the number of mnodes in cluster.
*
*/
int64_t numOfMnode; int64_t numOfMnode;
/**
* @brief the number of vgroups in cluster.
*
*/
int64_t numOfVgroup; int64_t numOfVgroup;
/**
* @brief the number of databases in cluster.
*
*/
int64_t numOfDatabase; int64_t numOfDatabase;
/**
* @brief the number of super tables in cluster.
*
*/
int64_t numOfSuperTable; int64_t numOfSuperTable;
/**
* @brief the number of child tables in cluster.
*
*/
int64_t numOfChildTable; int64_t numOfChildTable;
/**
* @brief the number of normal tables in cluster.
*
*/
int64_t numOfNormalTable; int64_t numOfNormalTable;
/**
* @brief the number of numOfTimeseries in cluster.
*
*/
int64_t numOfColumn; int64_t numOfColumn;
/**
* @brief total points written in cluster.
*
*/
int64_t totalPoints; int64_t totalPoints;
/**
* @brief total storage in cluster.
*
*/
int64_t totalStorage; int64_t totalStorage;
/**
* @brief total compressed storage in cluster.
*
*/
int64_t compStorage; int64_t compStorage;
} SMnodeLoad; } SMnodeLoad;
typedef struct { typedef struct {
/**
* @brief dnodeId of this mnode.
*
*/
int32_t dnodeId; int32_t dnodeId;
/**
* @brief clusterId of this mnode.
*
*/
int64_t clusterId; int64_t clusterId;
/**
* @brief replica num of this mnode.
*
*/
int8_t replica; int8_t replica;
/**
* @brief self index in the array of replicas.
*
*/
int8_t selfIndex; int8_t selfIndex;
/**
* @brief detail replica information of this mnode.
*
*/
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
/**
* @brief the parent dnode of this mnode.
*
*/
SDnode *pDnode; SDnode *pDnode;
/**
* @brief put apply msg to the write queue in dnode.
*
*/
PutMsgToMnodeQFp putMsgToApplyMsgFp; PutMsgToMnodeQFp putMsgToApplyMsgFp;
/**
* @brief the callback function while send msg to dnode.
*
*/
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
/**
* @brief the callback function while send msg to mnode.
*
*/
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
/**
* @brief the callback function while send redirect msg to clients or peers.
*
*/
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectMsgFp sendRedirectMsgFp;
int32_t sver;
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
char *timezone;
char *locale;
char *charset;
} SMnodeOpt; } SMnodeOpt;
/* ------------------------ SMnode ------------------------ */ /* ------------------------ SMnode ------------------------ */

View File

@ -138,11 +138,15 @@ void dmnWaitSignal() {
void dmnInitOption(SDnodeOpt *pOption) { void dmnInitOption(SDnodeOpt *pOption) {
pOption->sver = tsVersion; pOption->sver = tsVersion;
pOption->numOfCores = tsNumOfCores; pOption->numOfCores = tsNumOfCores;
pOption->numOfSupportMnodes = 1;
pOption->numOfSupportVnodes = 1;
pOption->numOfSupportQnodes = 1;
pOption->statusInterval = tsStatusInterval;
pOption->mnodeEqualVnodeNum = tsMnodeEqualVnodeNum;
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
pOption->ratioOfQueryCores = tsRatioOfQueryCores; pOption->ratioOfQueryCores = tsRatioOfQueryCores;
pOption->maxShellConns = tsMaxShellConns; pOption->maxShellConns = tsMaxShellConns;
pOption->shellActivityTimer = tsShellActivityTimer; pOption->shellActivityTimer = tsShellActivityTimer;
pOption->statusInterval = tsStatusInterval;
pOption->serverPort = tsServerPort; pOption->serverPort = tsServerPort;
tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN); tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN);
tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN); tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN);

View File

@ -348,19 +348,25 @@ static void dndSendStatusMsg(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
pStatus->sversion = htonl(pDnode->opt.sver); pStatus->sver = htonl(pDnode->opt.sver);
pStatus->dnodeId = htonl(pMgmt->dnodeId); pStatus->dnodeId = htonl(pMgmt->dnodeId);
pStatus->clusterId = htobe64(pMgmt->clusterId); pStatus->clusterId = htobe64(pMgmt->clusterId);
pStatus->rebootTime = htonl(pMgmt->rebootTime); pStatus->rebootTime = htonl(pMgmt->rebootTime);
pStatus->numOfCores = htonl(pDnode->opt.numOfCores); pStatus->numOfCores = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportQnodes = htons(pDnode->opt.numOfCores);
tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN); tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN);
pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval); pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval);
tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN); pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pDnode->opt.mnodeEqualVnodeNum);
tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN);
pStatus->clusterCfg.checkTime = 0; pStatus->clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime);
tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN);
tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads);

View File

@ -332,6 +332,12 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue; pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->sver = pDnode->opt.sver;
pOption->statusInterval = pDnode->opt.statusInterval;
pOption->mnodeEqualVnodeNum = pDnode->opt.mnodeEqualVnodeNum;
pOption->timezone = pDnode->opt.timezone;
pOption->charset = pDnode->opt.charset;
pOption->locale = pDnode->opt.locale;
} }
static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {

View File

@ -10,4 +10,5 @@ target_link_libraries(
PRIVATE sdb PRIVATE sdb
PRIVATE transport PRIVATE transport
PRIVATE cjson PRIVATE cjson
PRIVATE sync
) )

View File

@ -24,6 +24,7 @@
#include "thash.h" #include "thash.h"
#include "cJSON.h" #include "cJSON.h"
#include "mnode.h" #include "mnode.h"
#include "sync.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -89,24 +90,14 @@ typedef enum {
DND_REASON_ONLINE = 0, DND_REASON_ONLINE = 0,
DND_REASON_STATUS_MSG_TIMEOUT, DND_REASON_STATUS_MSG_TIMEOUT,
DND_REASON_STATUS_NOT_RECEIVED, DND_REASON_STATUS_NOT_RECEIVED,
DND_REASON_RESET_BY_MNODE,
DND_REASON_VERSION_NOT_MATCH, DND_REASON_VERSION_NOT_MATCH,
DND_REASON_DNODE_ID_NOT_MATCH, DND_REASON_DNODE_ID_NOT_MATCH,
DND_REASON_CLUSTER_ID_NOT_MATCH, DND_REASON_CLUSTER_ID_NOT_MATCH,
DND_REASON_NUM_OF_MNODES_NOT_MATCH,
DND_REASON_ENABLE_BALANCE_NOT_MATCH,
DND_REASON_MN_EQUAL_VN_NOT_MATCH, DND_REASON_MN_EQUAL_VN_NOT_MATCH,
DND_REASON_OFFLINE_THRESHOLD_NOT_MATCH,
DND_REASON_STATUS_INTERVAL_NOT_MATCH, DND_REASON_STATUS_INTERVAL_NOT_MATCH,
DND_REASON_MAX_TAB_PER_VN_NOT_MATCH,
DND_REASON_MAX_VG_PER_DB_NOT_MATCH,
DND_REASON_ARBITRATOR_NOT_MATCH,
DND_REASON_TIME_ZONE_NOT_MATCH, DND_REASON_TIME_ZONE_NOT_MATCH,
DND_REASON_LOCALE_NOT_MATCH, DND_REASON_LOCALE_NOT_MATCH,
DND_REASON_CHARSET_NOT_MATCH, DND_REASON_CHARSET_NOT_MATCH,
DND_REASON_FLOW_CTRL_NOT_MATCH,
DND_REASON_SLAVE_QUERY_NOT_MATCH,
DND_REASON_ADJUST_MASTER_NOT_MATCH,
DND_REASON_OTHERS DND_REASON_OTHERS
} EDndReason; } EDndReason;
@ -135,13 +126,14 @@ typedef struct SDnodeObj {
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
int64_t rebootTime; int64_t rebootTime;
int64_t lastAccessTime; int32_t accessTimes;
int16_t numOfMnodes; int16_t numOfMnodes;
int16_t numOfVnodes; int16_t numOfVnodes;
int16_t numOfQnodes; int16_t numOfQnodes;
int16_t numOfSupportMnodes; int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes; int16_t numOfSupportVnodes;
int16_t numOfSupportQnodes; int16_t numOfSupportQnodes;
int16_t numOfCores;
EDndStatus status; EDndStatus status;
EDndReason offlineReason; EDndReason offlineReason;
uint16_t port; uint16_t port;
@ -153,8 +145,7 @@ typedef struct SMnodeObj {
int32_t id; int32_t id;
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
int8_t status; ESyncState role;
int8_t role;
int32_t roleTerm; int32_t roleTerm;
int64_t roleTime; int64_t roleTime;
SDnodeObj *pDnode; SDnodeObj *pDnode;

View File

@ -24,6 +24,8 @@ extern "C" {
int32_t mndInitDnode(SMnode *pMnode); int32_t mndInitDnode(SMnode *pMnode);
void mndCleanupDnode(SMnode *pMnode); void mndCleanupDnode(SMnode *pMnode);
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId);
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -50,6 +50,12 @@ typedef struct SMnode {
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectMsgFp sendRedirectMsgFp;
PutMsgToMnodeQFp putMsgToApplyMsgFp; PutMsgToMnodeQFp putMsgToApplyMsgFp;
int32_t sver;
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
char *timezone;
char *locale;
char *charset;
} SMnode; } SMnode;
tmr_h mndGetTimer(SMnode *pMnode); tmr_h mndGetTimer(SMnode *pMnode);

View File

@ -24,8 +24,7 @@ extern "C" {
int32_t mndInitMnode(SMnode *pMnode); int32_t mndInitMnode(SMnode *pMnode);
void mndCleanupMnode(SMnode *pMnode); void mndCleanupMnode(SMnode *pMnode);
void mndGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId);
void mndGetMnodeEpSetForShell(SEpSet *epSet, bool redirect);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -14,12 +14,30 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "ttime.h"
#define SDB_DNODE_VER 1 #define SDB_DNODE_VER 1
static char *offlineReason[] = {
"",
"status msg timeout",
"status not received",
"version not match",
"dnodeId not match",
"clusterId not match",
"mnEqualVn not match",
"interval not match",
"timezone not match",
"locale not match",
"charset not match",
"unknown",
};
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) { static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_DNODE_VER, sizeof(SDnodeObj)); SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, SDB_DNODE_VER, sizeof(SDnodeObj));
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
int32_t dataPos = 0; int32_t dataPos = 0;
@ -59,7 +77,8 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
static void mnodeResetDnode(SDnodeObj *pDnode) { static void mnodeResetDnode(SDnodeObj *pDnode) {
pDnode->rebootTime = 0; pDnode->rebootTime = 0;
pDnode->lastAccessTime = 0; pDnode->accessTimes = 0;
pDnode->numOfCores = 0;
pDnode->numOfMnodes = 0; pDnode->numOfMnodes = 0;
pDnode->numOfVnodes = 0; pDnode->numOfVnodes = 0;
pDnode->numOfQnodes = 0; pDnode->numOfQnodes = 0;
@ -67,7 +86,7 @@ static void mnodeResetDnode(SDnodeObj *pDnode) {
pDnode->numOfSupportVnodes = 0; pDnode->numOfSupportVnodes = 0;
pDnode->numOfSupportQnodes = 0; pDnode->numOfSupportQnodes = 0;
pDnode->status = DND_STATUS_OFFLINE; pDnode->status = DND_STATUS_OFFLINE;
pDnode->offlineReason = DND_REASON_RESET_BY_MNODE; pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port); snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port);
} }
@ -89,7 +108,7 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj
static int32_t mndCreateDefaultDnode(SMnode *pMnode) { static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
SDnodeObj dnodeObj = {0}; SDnodeObj dnodeObj = {0};
dnodeObj.id = 0; dnodeObj.id = 1;
dnodeObj.createdTime = taosGetTimestampMs(); dnodeObj.createdTime = taosGetTimestampMs();
dnodeObj.updateTime = dnodeObj.createdTime; dnodeObj.updateTime = dnodeObj.createdTime;
dnodeObj.port = pMnode->replicas[0].port; dnodeObj.port = pMnode->replicas[0].port;
@ -102,6 +121,191 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) {
sdbCancelFetch(pSdb, pIter);
return pDnode;
}
}
return NULL;
}
static int32_t mndGetDnodeSize(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
return sdbGetSize(pSdb, SDB_DNODE);
}
static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
SSdb *pSdb = pMnode->pSdb;
int32_t i = 0;
void *pIter = NULL;
while (1) {
SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
if (i >= numOfEps) {
sdbCancelFetch(pSdb, pIter);
break;
}
SDnodeEp *pEp = &pEps->eps[i];
pEp->id = htonl(pDnode->id);
pEp->port = htons(pDnode->port);
memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
pEp->isMnode = 0;
if (mndIsMnode(pMnode, pDnode->id)) {
pEp->isMnode = 1;
}
i++;
}
pEps->num = i;
}
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
if (pCfg->mnodeEqualVnodeNum != pMnode->mnodeEqualVnodeNum) {
mError("\"mnodeEqualVnodeNum\"[%d - %d] cfg inconsistent", pCfg->mnodeEqualVnodeNum, pMnode->mnodeEqualVnodeNum);
return DND_REASON_MN_EQUAL_VN_NOT_MATCH;
}
if (pCfg->statusInterval != pMnode->statusInterval) {
mError("\"statusInterval\"[%d - %d] cfg inconsistent", pCfg->statusInterval, pMnode->statusInterval);
return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
}
int64_t checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
if ((0 != strcasecmp(pCfg->timezone, pMnode->timezone)) && (checkTime != pCfg->checkTime)) {
mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, tsTimezone,
pCfg->checkTime, checkTime);
return DND_REASON_TIME_ZONE_NOT_MATCH;
}
if (0 != strcasecmp(pCfg->locale, pMnode->locale)) {
mError("\"locale\"[%s - %s] cfg parameters inconsistent", pCfg->locale, pMnode->locale);
return DND_REASON_LOCALE_NOT_MATCH;
}
if (0 != strcasecmp(pCfg->charset, pMnode->charset)) {
mError("\"charset\"[%s - %s] cfg parameters inconsistent.", pCfg->charset, pMnode->charset);
return DND_REASON_CHARSET_NOT_MATCH;
}
return 0;
}
static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
pStatus->sver = htonl(pStatus->sver);
pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htobe64(pStatus->clusterId);
pStatus->rebootTime = htonl(pStatus->rebootTime);
pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes);
pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes);
pStatus->numOfSupportQnodes = htons(pStatus->numOfSupportQnodes);
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pStatus->clusterCfg.mnodeEqualVnodeNum);
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
SDnodeObj *pDnode = NULL;
if (pStatus->dnodeId == 0) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp);
if (pDnode == NULL) {
mDebug("dnode:%s, not created yet", pStatus->dnodeEp);
return TSDB_CODE_MND_DNODE_NOT_EXIST;
}
} else {
pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId);
if (pDnode == NULL) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp);
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
}
mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp);
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_MND_DNODE_NOT_EXIST;
}
}
if (pStatus->sver != pMnode->sver) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
}
mndReleaseDnode(pMnode, pDnode);
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->sver);
return TSDB_CODE_MND_INVALID_MSG_VERSION;
}
int64_t clusterId = mndGetClusterId(pMnode);
if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, clusterId);
} else {
if (pStatus->clusterId != clusterId) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
}
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, clusterId);
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_MND_INVALID_CLUSTER_ID;
} else {
pDnode->accessTimes++;
mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes);
}
}
if (pDnode->status == DND_STATUS_OFFLINE) {
// Verify whether the cluster parameters are consistent when status change from offline to ready
int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg);
if (0 != ret) {
pDnode->offlineReason = ret;
mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]);
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT;
}
mInfo("dnode:%d, from offline to online", pDnode->id);
}
pDnode->rebootTime = pStatus->rebootTime;
pDnode->numOfCores = pStatus->numOfCores;
pDnode->numOfSupportMnodes = pStatus->numOfSupportMnodes;
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes;
pDnode->numOfSupportQnodes = pStatus->numOfSupportQnodes;
pDnode->status = DND_STATUS_READY;
int32_t numOfEps = mndGetDnodeSize(pMnode);
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);
SStatusRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_OUT_OF_MEMORY;
}
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
pRsp->dnodeCfg.dropped = 0;
pRsp->dnodeCfg.clusterId = htobe64(clusterId);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
pMsg->rpcRsp.len = contLen;
pMsg->rpcRsp.rsp = pRsp;
mndReleaseDnode(pMnode, pDnode);
return 0;
}
static int32_t mndProcessCreateDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCreateDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
@ -109,8 +313,8 @@ static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return
static int32_t mndProcessConfigDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessConfigDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
int32_t mndInitDnode(SMnode *pMnode) { int32_t mndInitDnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_USER, SSdbTable table = {.sdbType = SDB_DNODE,
.keyType = SDB_KEY_BINARY, .keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultDnode, .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
.encodeFp = (SdbEncodeFp)mndDnodeActionEncode, .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndDnodeActionDecode, .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
@ -121,8 +325,19 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS_RSP, mndProcessStatusMsg);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
void mndCleanupDnode(SMnode *pMnode) {} void mndCleanupDnode(SMnode *pMnode) {}
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
}
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pDnode);
}

View File

@ -14,11 +14,120 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "mndTrans.h"
#include "mndInt.h"
#define SDB_MNODE_VER 1
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pMnodeObj) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, SDB_MNODE_VER, sizeof(SMnodeObj));
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pMnodeObj->id);
SDB_SET_INT64(pRaw, dataPos, pMnodeObj->createdTime)
SDB_SET_INT64(pRaw, dataPos, pMnodeObj->updateTime)
return pRaw;
}
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != SDB_MNODE_VER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode mnode since %s", terrstr());
return NULL;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj));
SMnodeObj *pMnodeObj = sdbGetRowObj(pRow);
if (pMnodeObj == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pMnodeObj->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pMnodeObj->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pMnodeObj->updateTime)
return pRow;
}
static void mnodeResetMnode(SMnodeObj *pMnodeObj) {
pMnodeObj->role = TAOS_SYNC_STATE_FOLLOWER;
pMnodeObj->roleTerm = 0;
pMnodeObj->roleTime = 0;
}
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj) {
pMnodeObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pMnodeObj->id);
if (pMnodeObj->pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1;
}
mnodeResetMnode(pMnodeObj);
return 0;
}
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) {
if (pMnodeObj->pDnode != NULL) {
sdbRelease(pSdb, pMnodeObj->pDnode);
pMnodeObj->pDnode = NULL;
}
return 0;
}
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pSrcMnode, SMnodeObj *pDstMnode) {
pSrcMnode->id = pDstMnode->id;
pSrcMnode->createdTime = pDstMnode->createdTime;
pSrcMnode->updateTime = pDstMnode->updateTime;
mnodeResetMnode(pSrcMnode);
}
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
SMnodeObj mnodeObj = {0};
mnodeObj.id = 0;
mnodeObj.createdTime = taosGetTimestampMs();
mnodeObj.updateTime = mnodeObj.createdTime;
SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
return sdbWrite(pMnode->pSdb, pRaw);
}
static int32_t mndProcessCreateMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
int32_t mndInitMnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_MNODE,
.keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultMnode,
.encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
.insertFp = (SdbInsertFp)mndMnodeActionInsert,
.updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndMnodeActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeMsg);
return sdbSetTable(pMnode->pSdb, table);
}
int32_t mndInitMnode(SMnode *pMnode) { return 0; }
void mndCleanupMnode(SMnode *pMnode) {} void mndCleanupMnode(SMnode *pMnode) {}
void mndGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {} bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
void mndGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {} SSdb *pSdb = pMnode->pSdb;
SMnodeObj *pMnodeObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId);
if (pMnodeObj == NULL) {
return false;
}
sdbRelease(pSdb, pMnodeObj);
return true;
}

View File

@ -225,15 +225,22 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
pMnode->sver = pOption->sver;
pMnode->statusInterval = pOption->statusInterval;
pMnode->mnodeEqualVnodeNum = pOption->mnodeEqualVnodeNum;
pMnode->timezone = strdup(pOption->timezone);
pMnode->locale = strdup(pOption->locale);
pMnode->charset = strdup(pOption->charset);
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL) { pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
pMnode->statusInterval < 1 || pOption->mnodeEqualVnodeNum < 0) {
terrno = TSDB_CODE_MND_APP_ERROR; terrno = TSDB_CODE_MND_APP_ERROR;
return -1; return -1;
} }
if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) { if (pMnode->timezone == NULL || pMnode->locale == NULL || pMnode->charset == NULL) {
terrno = TSDB_CODE_MND_APP_ERROR; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
@ -299,6 +306,9 @@ void mndClose(SMnode *pMnode) {
mDebug("start to close mnode"); mDebug("start to close mnode");
mndCleanupSteps(pMnode, -1); mndCleanupSteps(pMnode, -1);
tfree(pMnode->path); tfree(pMnode->path);
tfree(pMnode->charset);
tfree(pMnode->locale);
tfree(pMnode->timezone);
tfree(pMnode); tfree(pMnode);
mDebug("mnode is closed"); mDebug("mnode is closed");
} }