diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index e70a8539c3..b655016f3c 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -63,6 +63,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_USER, "drop-user" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DNODE, "create-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DNODE, "drop-dnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE, "create-mnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE, "drop-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DB, "create-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DB, "drop-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_USE_DB, "use-db" ) @@ -631,7 +633,7 @@ typedef struct { typedef struct { int32_t statusInterval; - int8_t reserved[4]; + int32_t mnodeEqualVnodeNum; int64_t checkTime; // 1970-01-01 00:00:00.000 char timezone[TSDB_TIMEZONE_LEN]; // tsTimezone char locale[TSDB_LOCALE_LEN]; // tsLocale @@ -654,11 +656,14 @@ typedef struct { } SVnodeLoads; typedef struct SStatusMsg { - uint32_t sversion; + int32_t sver; int32_t dnodeId; int64_t clusterId; uint32_t rebootTime; // time stamp for last reboot - int32_t numOfCores; + int16_t numOfCores; + int16_t numOfSupportMnodes; + int16_t numOfSupportVnodes; + int16_t numOfSupportQnodes; char dnodeEp[TSDB_EP_LEN]; SClusterCfg clusterCfg; SVnodeLoads vnodeLoads; diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index fe9560d427..50886932ce 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -26,95 +26,25 @@ extern "C" { typedef struct SDnode SDnode; typedef struct { - /** - * @brief software version of the program. - * - */ - int32_t sver; - - /** - * @brief num of CPU cores. - * - */ - int32_t numOfCores; - - /** - * @brief number of threads per CPU core. - * - */ - float numOfThreadsPerCore; - - /** - * @brief the proportion of total CPU cores available for query processing. - * - */ - float ratioOfQueryCores; - - /** - * @brief max number of connections allowed in dnode. - * - */ - int32_t maxShellConns; - - /** - * @brief time interval of heart beat from shell to dnode, seconds. - * - */ - int32_t shellActivityTimer; - - /** - * @brief time interval of dnode status reporting to mnode, seconds, for cluster only. - * - */ - int32_t statusInterval; - - /** - * @brief first port number for the connection (12 continuous UDP/TCP port number are used). - * - */ + int32_t sver; + int16_t numOfCores; + int16_t numOfSupportMnodes; + int16_t numOfSupportVnodes; + int16_t numOfSupportQnodes; + int32_t statusInterval; + int32_t mnodeEqualVnodeNum; + float numOfThreadsPerCore; + float ratioOfQueryCores; + int32_t maxShellConns; + int32_t shellActivityTimer; uint16_t serverPort; - - /** - * @brief data file's directory. - * - */ - char dataDir[TSDB_FILENAME_LEN]; - - /** - * @brief local endpoint. - * - */ - char localEp[TSDB_EP_LEN]; - - /** - * @brieflocal fully qualified domain name (FQDN). - * - */ - char localFqdn[TSDB_FQDN_LEN]; - - /** - * @brief first fully qualified domain name (FQDN) for TDengine system. - * - */ - char firstEp[TSDB_EP_LEN]; - - /** - * @brief system time zone. - * - */ - char timezone[TSDB_TIMEZONE_LEN]; - - /** - * @brief system locale. - * - */ - char locale[TSDB_LOCALE_LEN]; - - /** - * @briefdefault system charset. - * - */ - char charset[TSDB_LOCALE_LEN]; + char dataDir[TSDB_FILENAME_LEN]; + char localEp[TSDB_EP_LEN]; + char localFqdn[TSDB_FQDN_LEN]; + char firstEp[TSDB_EP_LEN]; + char timezone[TSDB_TIMEZONE_LEN]; + char locale[TSDB_LOCALE_LEN]; + char charset[TSDB_LOCALE_LEN]; } SDnodeOpt; /* ------------------------ SDnode ------------------------ */ diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 5066b881b5..a8a8117886 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -30,133 +30,36 @@ typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg); typedef struct SMnodeLoad { - /** - * @brief the number of dnodes in cluster. - * - */ int64_t numOfDnode; - - /** - * @brief the number of mnodes in cluster. - * - */ int64_t numOfMnode; - - /** - * @brief the number of vgroups in cluster. - * - */ int64_t numOfVgroup; - - /** - * @brief the number of databases in cluster. - * - */ int64_t numOfDatabase; - - /** - * @brief the number of super tables in cluster. - * - */ int64_t numOfSuperTable; - - /** - * @brief the number of child tables in cluster. - * - */ int64_t numOfChildTable; - - /** - * @brief the number of normal tables in cluster. - * - */ int64_t numOfNormalTable; - - /** - * @brief the number of numOfTimeseries in cluster. - * - */ int64_t numOfColumn; - - /** - * @brief total points written in cluster. - * - */ int64_t totalPoints; - - /** - * @brief total storage in cluster. - * - */ int64_t totalStorage; - - /** - * @brief total compressed storage in cluster. - * - */ int64_t compStorage; } SMnodeLoad; typedef struct { - /** - * @brief dnodeId of this mnode. - * - */ - int32_t dnodeId; - - /** - * @brief clusterId of this mnode. - * - */ - int64_t clusterId; - - /** - * @brief replica num of this mnode. - * - */ - int8_t replica; - - /** - * @brief self index in the array of replicas. - * - */ - int8_t selfIndex; - - /** - * @brief detail replica information of this mnode. - * - */ - SReplica replicas[TSDB_MAX_REPLICA]; - - /** - * @brief the parent dnode of this mnode. - * - */ - SDnode *pDnode; - - /** - * @brief put apply msg to the write queue in dnode. - * - */ - PutMsgToMnodeQFp putMsgToApplyMsgFp; - - /** - * @brief the callback function while send msg to dnode. - * - */ - SendMsgToDnodeFp sendMsgToDnodeFp; - - /** - * @brief the callback function while send msg to mnode. - * - */ - SendMsgToMnodeFp sendMsgToMnodeFp; - - /** - * @brief the callback function while send redirect msg to clients or peers. - * - */ + int32_t dnodeId; + int64_t clusterId; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; + SDnode *pDnode; + PutMsgToMnodeQFp putMsgToApplyMsgFp; + SendMsgToDnodeFp sendMsgToDnodeFp; + SendMsgToMnodeFp sendMsgToMnodeFp; SendRedirectMsgFp sendRedirectMsgFp; + int32_t sver; + int32_t statusInterval; + int32_t mnodeEqualVnodeNum; + char *timezone; + char *locale; + char *charset; } SMnodeOpt; /* ------------------------ SMnode ------------------------ */ diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 3648af5fcb..8bd3db0217 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -38,6 +38,15 @@ extern "C" { dataPos += sizeof(int32_t); \ } +#define SDB_GET_INT16(pData, pRow, dataPos, val) \ + { \ + if (sdbGetRawInt16(pRaw, dataPos, val) != 0) { \ + sdbFreeRow(pRow); \ + return NULL; \ + } \ + dataPos += sizeof(int16_t); \ + } + #define SDB_GET_INT8(pData, pRow, dataPos, val) \ { \ if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \ @@ -74,6 +83,15 @@ extern "C" { dataPos += sizeof(int32_t); \ } +#define SDB_SET_INT16(pRaw, dataPos, val) \ + { \ + if (sdbSetRawInt16(pRaw, dataPos, val) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + } \ + dataPos += sizeof(int16_t); \ + } + #define SDB_SET_INT8(pRaw, dataPos, val) \ { \ if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \ @@ -100,6 +118,7 @@ extern "C" { } \ } +typedef struct SMnode SMnode; typedef struct SSdbRaw SSdbRaw; typedef struct SSdbRow SSdbRow; typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType; @@ -130,7 +149,7 @@ typedef struct SSdb SSdb; typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj); typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj); -typedef int32_t (*SdbDeployFp)(SSdb *pSdb); +typedef int32_t (*SdbDeployFp)(SMnode *pMnode); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); @@ -190,6 +209,12 @@ typedef struct SSdbOpt { * */ const char *path; + + /** + * @brief The mnode object. + * + */ + SMnode *pMnode; } SSdbOpt; /** @@ -291,12 +316,14 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type); SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); void sdbFreeRaw(SSdbRaw *pRaw); int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val); +int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val); int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val); int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val); int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen); int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen); int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status); int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val); +int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val); int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val); int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val); int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen); diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index a0ca0dd390..08624f20f4 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -138,11 +138,15 @@ void dmnWaitSignal() { void dmnInitOption(SDnodeOpt *pOption) { pOption->sver = tsVersion; pOption->numOfCores = tsNumOfCores; + pOption->numOfSupportMnodes = 1; + pOption->numOfSupportVnodes = 1; + pOption->numOfSupportQnodes = 1; + pOption->statusInterval = tsStatusInterval; + pOption->mnodeEqualVnodeNum = tsMnodeEqualVnodeNum; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; pOption->ratioOfQueryCores = tsRatioOfQueryCores; pOption->maxShellConns = tsMaxShellConns; pOption->shellActivityTimer = tsShellActivityTimer; - pOption->statusInterval = tsStatusInterval; pOption->serverPort = tsServerPort; tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN); tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN); diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 4265c8a3cd..7b8afa96bb 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -348,19 +348,25 @@ static void dndSendStatusMsg(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosRLockLatch(&pMgmt->latch); - pStatus->sversion = htonl(pDnode->opt.sver); + pStatus->sver = htonl(pDnode->opt.sver); pStatus->dnodeId = htonl(pMgmt->dnodeId); pStatus->clusterId = htobe64(pMgmt->clusterId); pStatus->rebootTime = htonl(pMgmt->rebootTime); - pStatus->numOfCores = htonl(pDnode->opt.numOfCores); + pStatus->numOfCores = htons(pDnode->opt.numOfCores); + pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores); + pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfCores); + pStatus->numOfSupportQnodes = htons(pDnode->opt.numOfCores); tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN); + pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval); - tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN); - tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN); - tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN); + pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pDnode->opt.mnodeEqualVnodeNum); pStatus->clusterCfg.checkTime = 0; char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime); + tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN); + tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN); + tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN); taosRUnLockLatch(&pMgmt->latch); dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index ca48642899..342d14463a 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -332,6 +332,12 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue; pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); + pOption->sver = pDnode->opt.sver; + pOption->statusInterval = pDnode->opt.statusInterval; + pOption->mnodeEqualVnodeNum = pDnode->opt.mnodeEqualVnodeNum; + pOption->timezone = pDnode->opt.timezone; + pOption->charset = pDnode->opt.charset; + pOption->locale = pDnode->opt.locale; } static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 6fecb457c0..98c604e520 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -10,4 +10,5 @@ target_link_libraries( PRIVATE sdb PRIVATE transport PRIVATE cjson + PRIVATE sync ) \ No newline at end of file diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9b8c6eccc8..ded66edaa7 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -24,6 +24,7 @@ #include "thash.h" #include "cJSON.h" #include "mnode.h" +#include "sync.h" #ifdef __cplusplus extern "C" { @@ -78,6 +79,28 @@ typedef enum { typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy; +typedef enum { + DND_STATUS_OFFLINE = 0, + DND_STATUS_READY = 1, + DND_STATUS_CREATING = 2, + DND_STATUS_DROPPING = 3 +} EDndStatus; + +typedef enum { + DND_REASON_ONLINE = 0, + DND_REASON_STATUS_MSG_TIMEOUT, + DND_REASON_STATUS_NOT_RECEIVED, + DND_REASON_VERSION_NOT_MATCH, + DND_REASON_DNODE_ID_NOT_MATCH, + DND_REASON_CLUSTER_ID_NOT_MATCH, + DND_REASON_MN_EQUAL_VN_NOT_MATCH, + DND_REASON_STATUS_INTERVAL_NOT_MATCH, + DND_REASON_TIME_ZONE_NOT_MATCH, + DND_REASON_LOCALE_NOT_MATCH, + DND_REASON_CHARSET_NOT_MATCH, + DND_REASON_OTHERS +} EDndReason; + typedef struct STrans { int32_t id; ETrnStage stage; @@ -99,29 +122,32 @@ typedef struct SClusterObj { } SClusterObj; typedef struct SDnodeObj { - int32_t id; - int32_t vnodes; - int64_t createdTime; - int64_t updateTime; - int64_t lastAccess; - int64_t rebootTime; // time stamp for last reboot - char fqdn[TSDB_FQDN_LEN]; - char ep[TSDB_EP_LEN]; - uint16_t port; - int16_t numOfCores; // from dnode status msg - int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode - int8_t status; // set in balance function - int8_t offlineReason; + int32_t id; + int64_t createdTime; + int64_t updateTime; + int64_t rebootTime; + int32_t accessTimes; + int16_t numOfMnodes; + int16_t numOfVnodes; + int16_t numOfQnodes; + int16_t numOfSupportMnodes; + int16_t numOfSupportVnodes; + int16_t numOfSupportQnodes; + int16_t numOfCores; + EDndStatus status; + EDndReason offlineReason; + uint16_t port; + char fqdn[TSDB_FQDN_LEN]; + char ep[TSDB_EP_LEN]; } SDnodeObj; typedef struct SMnodeObj { int32_t id; - int8_t status; - int8_t role; - int32_t roleTerm; - int64_t roleTime; int64_t createdTime; int64_t updateTime; + ESyncState role; + int32_t roleTerm; + int64_t roleTime; SDnodeObj *pDnode; } SMnodeObj; diff --git a/source/dnode/mnode/impl/inc/mndDnode.h b/source/dnode/mnode/impl/inc/mndDnode.h index d7bfdba122..9bb1ab7acd 100644 --- a/source/dnode/mnode/impl/inc/mndDnode.h +++ b/source/dnode/mnode/impl/inc/mndDnode.h @@ -22,8 +22,10 @@ extern "C" { #endif -int32_t mndInitDnode(SMnode *pMnode); -void mndCleanupDnode(SMnode *pMnode); +int32_t mndInitDnode(SMnode *pMnode); +void mndCleanupDnode(SMnode *pMnode); +SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId); +void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 2c7e597774..bb3f0ca263 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -50,6 +50,12 @@ typedef struct SMnode { SendMsgToMnodeFp sendMsgToMnodeFp; SendRedirectMsgFp sendRedirectMsgFp; PutMsgToMnodeQFp putMsgToApplyMsgFp; + int32_t sver; + int32_t statusInterval; + int32_t mnodeEqualVnodeNum; + char *timezone; + char *locale; + char *charset; } SMnode; tmr_h mndGetTimer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/inc/mndMnode.h b/source/dnode/mnode/impl/inc/mndMnode.h index c57e1d42a5..53f7b733f2 100644 --- a/source/dnode/mnode/impl/inc/mndMnode.h +++ b/source/dnode/mnode/impl/inc/mndMnode.h @@ -24,8 +24,7 @@ extern "C" { int32_t mndInitMnode(SMnode *pMnode); void mndCleanupMnode(SMnode *pMnode); -void mndGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect); -void mndGetMnodeEpSetForShell(SEpSet *epSet, bool redirect); +bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 745c0a0447..9559a96255 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_TRANSACTION_INT_H_ -#define _TD_TRANSACTION_INT_H_ +#ifndef _TD_MND_TRANS_H_ +#define _TD_MND_TRANS_H_ #include "mndInt.h" @@ -44,4 +44,4 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); } #endif -#endif /*_TD_TRANSACTION_INT_H_*/ +#endif /*_TD_MND_TRANS_H_*/ diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index dd91de1c95..73b0167422 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -79,7 +79,7 @@ static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *p return 0; } -static int32_t mnodeCreateDefaultAcct(SSdb *pSdb) { +static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) { int32_t code = 0; SAcctObj acctObj = {0}; @@ -98,7 +98,7 @@ static int32_t mnodeCreateDefaultAcct(SSdb *pSdb) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - return sdbWrite(pSdb, pRaw); + return sdbWrite(pMnode->pSdb, pRaw); } int32_t mndInitAcct(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 11f3dc1ee9..067faba558 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -14,8 +14,330 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" +#include "mndDnode.h" +#include "mndMnode.h" +#include "mndTrans.h" +#include "ttime.h" -int32_t mndInitDnode(SMnode *pMnode) { return 0; } -void mndCleanupDnode(SMnode *pMnode) {} \ No newline at end of file +#define SDB_DNODE_VER 1 + +static char *offlineReason[] = { + "", + "status msg timeout", + "status not received", + "version not match", + "dnodeId not match", + "clusterId not match", + "mnEqualVn not match", + "interval not match", + "timezone not match", + "locale not match", + "charset not match", + "unknown", +}; + +static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) { + SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, SDB_DNODE_VER, sizeof(SDnodeObj)); + if (pRaw == NULL) return NULL; + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, pDnode->id); + SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime) + SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime) + SDB_SET_INT16(pRaw, dataPos, pDnode->port) + SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN) + SDB_SET_DATALEN(pRaw, dataPos); + + return pRaw; +} + +static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_DNODE_VER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("failed to decode dnode since %s", terrstr()); + return NULL; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SDnodeObj)); + SDnodeObj *pDnode = sdbGetRowObj(pRow); + if (pDnode == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &pDnode->id) + SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->updateTime) + SDB_GET_INT16(pRaw, pRow, dataPos, &pDnode->port) + SDB_GET_BINARY(pRaw, pRow, dataPos, pDnode->fqdn, TSDB_FQDN_LEN) + + return pRow; +} + +static void mnodeResetDnode(SDnodeObj *pDnode) { + pDnode->rebootTime = 0; + pDnode->accessTimes = 0; + pDnode->numOfCores = 0; + pDnode->numOfMnodes = 0; + pDnode->numOfVnodes = 0; + pDnode->numOfQnodes = 0; + pDnode->numOfSupportMnodes = 0; + pDnode->numOfSupportVnodes = 0; + pDnode->numOfSupportQnodes = 0; + pDnode->status = DND_STATUS_OFFLINE; + pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED; + snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port); +} + +static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) { + mnodeResetDnode(pDnode); + return 0; +} + +static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { return 0; } + +static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj *pDstDnode) { + pSrcDnode->id = pDstDnode->id; + pSrcDnode->createdTime = pDstDnode->createdTime; + pSrcDnode->updateTime = pDstDnode->updateTime; + pSrcDnode->port = pDstDnode->port; + memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN); + mnodeResetDnode(pSrcDnode); +} + +static int32_t mndCreateDefaultDnode(SMnode *pMnode) { + SDnodeObj dnodeObj = {0}; + dnodeObj.id = 1; + dnodeObj.createdTime = taosGetTimestampMs(); + dnodeObj.updateTime = dnodeObj.createdTime; + dnodeObj.port = pMnode->replicas[0].port; + memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN); + + SSdbRaw *pRaw = mndDnodeActionEncode(&dnodeObj); + if (pRaw == NULL) return -1; + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + + return sdbWrite(pMnode->pSdb, pRaw); +} + +static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) { + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + while (1) { + SDnodeObj *pDnode = NULL; + pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); + if (pIter == NULL) break; + + if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) { + sdbCancelFetch(pSdb, pIter); + return pDnode; + } + } + + return NULL; +} + +static int32_t mndGetDnodeSize(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + return sdbGetSize(pSdb, SDB_DNODE); +} + +static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) { + SSdb *pSdb = pMnode->pSdb; + + int32_t i = 0; + void *pIter = NULL; + while (1) { + SDnodeObj *pDnode = NULL; + pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); + if (pIter == NULL) break; + if (i >= numOfEps) { + sdbCancelFetch(pSdb, pIter); + break; + } + + SDnodeEp *pEp = &pEps->eps[i]; + pEp->id = htonl(pDnode->id); + pEp->port = htons(pDnode->port); + memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + pEp->isMnode = 0; + if (mndIsMnode(pMnode, pDnode->id)) { + pEp->isMnode = 1; + } + i++; + } + + pEps->num = i; +} + +static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { + if (pCfg->mnodeEqualVnodeNum != pMnode->mnodeEqualVnodeNum) { + mError("\"mnodeEqualVnodeNum\"[%d - %d] cfg inconsistent", pCfg->mnodeEqualVnodeNum, pMnode->mnodeEqualVnodeNum); + return DND_REASON_MN_EQUAL_VN_NOT_MATCH; + } + + if (pCfg->statusInterval != pMnode->statusInterval) { + mError("\"statusInterval\"[%d - %d] cfg inconsistent", pCfg->statusInterval, pMnode->statusInterval); + return DND_REASON_STATUS_INTERVAL_NOT_MATCH; + } + + int64_t checkTime = 0; + char timestr[32] = "1970-01-01 00:00:00.00"; + (void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + if ((0 != strcasecmp(pCfg->timezone, pMnode->timezone)) && (checkTime != pCfg->checkTime)) { + mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, tsTimezone, + pCfg->checkTime, checkTime); + return DND_REASON_TIME_ZONE_NOT_MATCH; + } + + if (0 != strcasecmp(pCfg->locale, pMnode->locale)) { + mError("\"locale\"[%s - %s] cfg parameters inconsistent", pCfg->locale, pMnode->locale); + return DND_REASON_LOCALE_NOT_MATCH; + } + + if (0 != strcasecmp(pCfg->charset, pMnode->charset)) { + mError("\"charset\"[%s - %s] cfg parameters inconsistent.", pCfg->charset, pMnode->charset); + return DND_REASON_CHARSET_NOT_MATCH; + } + + return 0; +} + +static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { + SStatusMsg *pStatus = pMsg->rpcMsg.pCont; + pStatus->sver = htonl(pStatus->sver); + pStatus->dnodeId = htonl(pStatus->dnodeId); + pStatus->clusterId = htobe64(pStatus->clusterId); + pStatus->rebootTime = htonl(pStatus->rebootTime); + pStatus->numOfCores = htons(pStatus->numOfCores); + pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes); + pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes); + pStatus->numOfSupportQnodes = htons(pStatus->numOfSupportQnodes); + + pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); + pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pStatus->clusterCfg.mnodeEqualVnodeNum); + pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); + + SDnodeObj *pDnode = NULL; + if (pStatus->dnodeId == 0) { + pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); + if (pDnode == NULL) { + mDebug("dnode:%s, not created yet", pStatus->dnodeEp); + return TSDB_CODE_MND_DNODE_NOT_EXIST; + } + } else { + pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId); + if (pDnode == NULL) { + pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); + if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { + pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; + } + mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp); + mndReleaseDnode(pMnode, pDnode); + return TSDB_CODE_MND_DNODE_NOT_EXIST; + } + } + + if (pStatus->sver != pMnode->sver) { + if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { + pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; + } + mndReleaseDnode(pMnode, pDnode); + mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->sver); + return TSDB_CODE_MND_INVALID_MSG_VERSION; + } + + int64_t clusterId = mndGetClusterId(pMnode); + if (pStatus->dnodeId == 0) { + mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, clusterId); + } else { + if (pStatus->clusterId != clusterId) { + if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { + pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; + } + mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, clusterId); + mndReleaseDnode(pMnode, pDnode); + return TSDB_CODE_MND_INVALID_CLUSTER_ID; + } else { + pDnode->accessTimes++; + mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); + } + } + + if (pDnode->status == DND_STATUS_OFFLINE) { + // Verify whether the cluster parameters are consistent when status change from offline to ready + int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg); + if (0 != ret) { + pDnode->offlineReason = ret; + mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]); + mndReleaseDnode(pMnode, pDnode); + return TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT; + } + + mInfo("dnode:%d, from offline to online", pDnode->id); + } + + pDnode->rebootTime = pStatus->rebootTime; + pDnode->numOfCores = pStatus->numOfCores; + pDnode->numOfSupportMnodes = pStatus->numOfSupportMnodes; + pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; + pDnode->numOfSupportQnodes = pStatus->numOfSupportQnodes; + pDnode->status = DND_STATUS_READY; + + int32_t numOfEps = mndGetDnodeSize(pMnode); + int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); + SStatusRsp *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + mndReleaseDnode(pMnode, pDnode); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); + pRsp->dnodeCfg.dropped = 0; + pRsp->dnodeCfg.clusterId = htobe64(clusterId); + mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); + + pMsg->rpcRsp.len = contLen; + pMsg->rpcRsp.rsp = pRsp; + mndReleaseDnode(pMnode, pDnode); + + return 0; +} + +static int32_t mndProcessCreateDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessConfigDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } + +int32_t mndInitDnode(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_DNODE, + .keyType = SDB_KEY_INT32, + .deployFp = (SdbDeployFp)mndCreateDefaultDnode, + .encodeFp = (SdbEncodeFp)mndDnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndDnodeActionDecode, + .insertFp = (SdbInsertFp)mndDnodeActionInsert, + .updateFp = (SdbUpdateFp)mndDnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndDnodeActionDelete}; + + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS_RSP, mndProcessStatusMsg); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupDnode(SMnode *pMnode) {} + +SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) { + SSdb *pSdb = pMnode->pSdb; + return sdbAcquire(pSdb, SDB_DNODE, &dnodeId); +} + +void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pDnode); +} diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index c2e05687c7..bbf39dcff9 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -14,11 +14,120 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" +#include "mndTrans.h" -int32_t mndInitMnode(SMnode *pMnode) { return 0; } -void mndCleanupMnode(SMnode *pMnode) {} +#define SDB_MNODE_VER 1 -void mndGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {} -void mndGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {} \ No newline at end of file +static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pMnodeObj) { + SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, SDB_MNODE_VER, sizeof(SMnodeObj)); + if (pRaw == NULL) return NULL; + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, pMnodeObj->id); + SDB_SET_INT64(pRaw, dataPos, pMnodeObj->createdTime) + SDB_SET_INT64(pRaw, dataPos, pMnodeObj->updateTime) + + return pRaw; +} + +static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_MNODE_VER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("failed to decode mnode since %s", terrstr()); + return NULL; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj)); + SMnodeObj *pMnodeObj = sdbGetRowObj(pRow); + if (pMnodeObj == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &pMnodeObj->id) + SDB_GET_INT64(pRaw, pRow, dataPos, &pMnodeObj->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pMnodeObj->updateTime) + + return pRow; +} + +static void mnodeResetMnode(SMnodeObj *pMnodeObj) { + pMnodeObj->role = TAOS_SYNC_STATE_FOLLOWER; + pMnodeObj->roleTerm = 0; + pMnodeObj->roleTime = 0; +} + +static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj) { + pMnodeObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pMnodeObj->id); + if (pMnodeObj->pDnode == NULL) { + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + return -1; + } + + mnodeResetMnode(pMnodeObj); + return 0; +} + +static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) { + if (pMnodeObj->pDnode != NULL) { + sdbRelease(pSdb, pMnodeObj->pDnode); + pMnodeObj->pDnode = NULL; + } + + return 0; +} + +static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pSrcMnode, SMnodeObj *pDstMnode) { + pSrcMnode->id = pDstMnode->id; + pSrcMnode->createdTime = pDstMnode->createdTime; + pSrcMnode->updateTime = pDstMnode->updateTime; + mnodeResetMnode(pSrcMnode); +} + +static int32_t mndCreateDefaultMnode(SMnode *pMnode) { + SMnodeObj mnodeObj = {0}; + mnodeObj.id = 0; + mnodeObj.createdTime = taosGetTimestampMs(); + mnodeObj.updateTime = mnodeObj.createdTime; + + SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj); + if (pRaw == NULL) return -1; + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + + return sdbWrite(pMnode->pSdb, pRaw); +} + +static int32_t mndProcessCreateMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessDropMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } + +int32_t mndInitMnode(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_MNODE, + .keyType = SDB_KEY_INT32, + .deployFp = (SdbDeployFp)mndCreateDefaultMnode, + .encodeFp = (SdbEncodeFp)mndMnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndMnodeActionDecode, + .insertFp = (SdbInsertFp)mndMnodeActionInsert, + .updateFp = (SdbUpdateFp)mndMnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndMnodeActionDelete}; + + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeMsg); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupMnode(SMnode *pMnode) {} + +bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) { + SSdb *pSdb = pMnode->pSdb; + + SMnodeObj *pMnodeObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId); + if (pMnodeObj == NULL) { + return false; + } + + sdbRelease(pSdb, pMnodeObj); + return true; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index d63e3662e0..4156d2ab37 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -21,7 +21,7 @@ #define SDB_USER_VER 1 static SSdbRaw *mndUserActionEncode(SUserObj *pUser) { - SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SAcctObj)); + SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SUserObj)); if (pRaw == NULL) return NULL; int32_t dataPos = 0; @@ -97,7 +97,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDs return 0; } -static int32_t mndCreateDefaultUser(SSdb *pSdb, char *acct, char *user, char *pass) { +static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char *pass) { SUserObj userObj = {0}; tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.acct, acct, TSDB_USER_LEN); @@ -113,15 +113,15 @@ static int32_t mndCreateDefaultUser(SSdb *pSdb, char *acct, char *user, char *pa if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - return sdbWrite(pSdb, pRaw); + return sdbWrite(pMnode->pSdb, pRaw); } -static int32_t mndCreateDefaultUsers(SSdb *pSdb) { - if (mndCreateDefaultUser(pSdb, TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { +static int32_t mndCreateDefaultUsers(SMnode *pMnode) { + if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { return -1; } - if (mndCreateDefaultUser(pSdb, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { + if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { return -1; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index bc2ed92197..e1c7b66c36 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -225,15 +225,22 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; + pMnode->sver = pOption->sver; + pMnode->statusInterval = pOption->statusInterval; + pMnode->mnodeEqualVnodeNum = pOption->mnodeEqualVnodeNum; + pMnode->timezone = strdup(pOption->timezone); + pMnode->locale = strdup(pOption->locale); + pMnode->charset = strdup(pOption->charset); if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || - pMnode->putMsgToApplyMsgFp == NULL) { + pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 || + pMnode->statusInterval < 1 || pOption->mnodeEqualVnodeNum < 0) { terrno = TSDB_CODE_MND_APP_ERROR; return -1; } - if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) { - terrno = TSDB_CODE_MND_APP_ERROR; + if (pMnode->timezone == NULL || pMnode->locale == NULL || pMnode->charset == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -299,6 +306,9 @@ void mndClose(SMnode *pMnode) { mDebug("start to close mnode"); mndCleanupSteps(pMnode, -1); tfree(pMnode->path); + tfree(pMnode->charset); + tfree(pMnode->locale); + tfree(pMnode->timezone); tfree(pMnode); mDebug("mnode is closed"); } diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 6162617568..e492f28557 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -38,8 +38,8 @@ extern "C" { typedef struct SSdbRaw { int8_t type; - int8_t sver; int8_t status; + int8_t sver; int8_t reserved; int32_t dataLen; char pData[]; @@ -53,6 +53,7 @@ typedef struct SSdbRow { } SSdbRow; typedef struct SSdb { + SMnode *pMnode; char *currDir; char *syncDir; char *tmpDir; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 4f1267498c..1ceb3862ee 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -27,7 +27,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { } char path[PATH_MAX + 100]; - snprintf(path, PATH_MAX + 100, "%s", pOption->path); + snprintf(path, PATH_MAX + 100, "%s%sdata", pOption->path, TD_DIRSEP); pSdb->currDir = strdup(path); snprintf(path, PATH_MAX + 100, "%s%ssync", pOption->path, TD_DIRSEP); pSdb->syncDir = strdup(path); @@ -44,6 +44,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { taosInitRWLatch(&pSdb->locks[i]); } + pSdb->pMnode = pOption->pMnode; mDebug("sdb init successfully"); return pSdb; } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 37fcdc19ef..b285675b85 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -46,7 +46,7 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) { SdbDeployFp fp = pSdb->deployFps[i]; if (fp == NULL) continue; - if ((*fp)(pSdb) != 0) { + if ((*fp)(pSdb->pMnode) != 0) { mError("failed to deploy sdb:%d since %s", i, terrstr()); return -1; } diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index 2abff74168..7ed1a427f5 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -61,6 +61,21 @@ int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) { return 0; } +int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int16_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *(int16_t *)(pRaw->pData + dataPos) = val; + return 0; +} + int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val) { if (pRaw == NULL) { terrno = TSDB_CODE_INVALID_PTR; @@ -146,6 +161,21 @@ int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) { return 0; } +int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int16_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *val = *(int16_t *)(pRaw->pData + dataPos); + return 0; +} + int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val) { if (pRaw == NULL) { terrno = TSDB_CODE_INVALID_PTR;