diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h new file mode 100644 index 0000000000..5002ee37b0 --- /dev/null +++ b/include/dnode/mgmt/dnode.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DNODE_H_ +#define _TD_DNODE_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +/* ------------------------ TYPES EXPOSED ------------------------ */ +typedef struct SDnode SDnode; + +/* ------------------------ SDnode ------------------------ */ +/** + * @brief Initialize and start the dnode. + * + * @param cfgPath Config file path. + * @return SDnode* The dnode object. + */ +SDnode *dnodeInit(const char *cfgPath); + +/** + * @brief Stop and cleanup dnode. + * + * @param pDnode The dnode object to close. + */ +void dnodeCleanup(SDnode *pDnode); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DNODE_H_*/ diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 1ef3bd579f..0071296bc1 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -20,17 +20,16 @@ extern "C" { #endif -typedef enum { MN_MSG_TYPE_WRITE = 1, MN_MSG_TYPE_APPLY, MN_MSG_TYPE_SYNC, MN_MSG_TYPE_READ } EMnMsgType; - +/* ------------------------ TYPES EXPOSED ------------------------ */ +typedef struct SDnode SDnode; +typedef struct SMnode SMnode; typedef struct SMnodeMsg SMnodeMsg; +typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg, bool forShell); +typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg); -typedef struct { - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; -} SMnodeCfg; - -typedef struct { +typedef struct SMnodeLoad { int64_t numOfDnode; int64_t numOfMnode; int64_t numOfVgroup; @@ -43,38 +42,126 @@ typedef struct { int64_t compStorage; } SMnodeLoad; -typedef struct SMnode SMnode; -typedef struct SServer SServer; - -typedef void (*SendMsgToDnodeFp)(SServer *pServer, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef void (*SendMsgToMnodeFp)(SServer *pServer, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SServer *pServer, struct SRpcMsg *rpcMsg, bool forShell); -typedef int32_t (*PutMsgToMnodeQFp)(SServer *pServer, SMnodeMsg *pMsg); - typedef struct { int32_t dnodeId; int64_t clusterId; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; + struct SServer *pServer; PutMsgToMnodeQFp putMsgToApplyMsgFp; SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp; SendRedirectMsgFp sendRedirectMsgFp; -} SMnodePara; +} SMnodeOptions; -SMnode* mnodeCreate(SMnodePara para); -void mnodeCleanup(); +/* ------------------------ SMnode ------------------------ */ +/** + * @brief Open a mnode. + * + * @param path Path of the mnode + * @param pOptions Options of the mnode + * @return SMnode* The mnode object + */ +SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions); -int32_t mnodeDeploy(SMnodeCfg *pCfg); -void mnodeUnDeploy(); -int32_t mnodeStart(SMnodeCfg *pCfg); -int32_t mnodeAlter(SMnodeCfg *pCfg); -void mnodeStop(); +/** + * @brief Close a mnode + * + * @param pMnode The mnode object to close + */ +void mnodeClose(SMnode *pMnode); -int32_t mnodeGetLoad(SMnodeLoad *pLoad); -int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); +/** + * @brief Close a mnode + * + * @param pMnode The mnode object to close + * @param pOptions Options of the mnode + * @return int32_t 0 for success, -1 for failure + */ +int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions); -SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg); -void mnodeCleanupMsg(SMnodeMsg *pMsg); -void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType); +/** + * @brief Drop a mnode. + * + * @param path Path of the mnode. + */ +void mnodeDestroy(const char *path); + +/** + * @brief Get mnode statistics info + * + * @param pMnode The mnode object + * @param pLoad Statistics of the mnode. + * @return int32_t 0 for success, -1 for failure + */ +int32_t mnodeGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); + +/** + * @brief Get user authentication info + * + * @param pMnode The mnode object + * @param user + * @param spi + * @param encrypt + * @param secret + * @param ckey + * @return int32_t 0 for success, -1 for failure + */ +int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); + +/** + * @brief Initialize mnode msg + * + * @param pMnode The mnode object + * @param pMsg The request rpc msg + * @return int32_t The created mnode msg + */ +SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg); + +/** + * @brief Cleanup mnode msg + * + * @param pMnode The mnode object + * @param pMsg The request msg + */ +void mnodeCleanupMsg(SMnode *pMnode, SMnodeMsg *pMsg); + +/** + * @brief Process the read request + * + * @param pMnode The mnode object + * @param pMsg The request msg + * @return int32_t 0 for success, -1 for failure + */ +void mnodeProcessReadMsg(SMnode *pMnode, SMnodeMsg *pMsg); + +/** + * @brief Process the write request + * + * @param pMnode The mnode object + * @param pMsg The request msg + * @return int32_t 0 for success, -1 for failure + */ +void mnodeProcessWriteMsg(SMnode *pMnode, SMnodeMsg *pMsg); + +/** + * @brief Process the sync request + * + * @param pMnode The mnode object + * @param pMsg The request msg + * @return int32_t 0 for success, -1 for failure + */ +void mnodeProcessSyncMsg(SMnode *pMnode, SMnodeMsg *pMsg); + +/** + * @brief Process the apply request + * + * @param pMnode The mnode object + * @param pMsg The request msg + * @return int32_t 0 for success, -1 for failure + */ +void mnodeProcessApplyMsg(SMnode *pMnode, SMnodeMsg *pMsg); #ifdef __cplusplus } diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 419c9dfcfc..36f6a3b6fb 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -184,11 +184,11 @@ typedef struct { SRpcMsg rpcMsg[]; } SVnodeMsg; -typedef struct SServer SServer; -typedef void (*SendMsgToDnodeFp)(SServer *pServer, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef void (*SendMsgToMnodeFp)(SServer *pServer, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SServer *pServer, struct SRpcMsg *rpcMsg, bool forShell); -typedef int32_t (*PutMsgToVnodeQFp)(SServer *pServer, int32_t vgId, SVnodeMsg *pMsg); +typedef struct SDnode SDnode; +typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg, bool forShell); +typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg); typedef struct { PutMsgToVnodeQFp putMsgToApplyQueueFp; diff --git a/source/dnode/CMakeLists.txt b/source/dnode/CMakeLists.txt index d719a2d106..af132dea80 100644 --- a/source/dnode/CMakeLists.txt +++ b/source/dnode/CMakeLists.txt @@ -1,4 +1,4 @@ add_subdirectory(mnode) add_subdirectory(vnode) add_subdirectory(qnode) -add_subdirectory(mgmt) +add_subdirectory(mgmt) \ No newline at end of file diff --git a/source/dnode/mgmt/CMakeLists.txt b/source/dnode/mgmt/CMakeLists.txt index 194c317991..64e8980219 100644 --- a/source/dnode/mgmt/CMakeLists.txt +++ b/source/dnode/mgmt/CMakeLists.txt @@ -1,16 +1,2 @@ -aux_source_directory(src DNODE_SRC) -add_executable(taosd ${DNODE_SRC}) -target_link_libraries( - taosd - PUBLIC cjson - PUBLIC mnode - PUBLIC vnode - PUBLIC wal - PUBLIC sync - PUBLIC taos -) -target_include_directories( - taosd - PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode" - private "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) +add_subdirectory(daemon) +add_subdirectory(impl) \ No newline at end of file diff --git a/source/dnode/mgmt/daemon/CMakeLists.txt b/source/dnode/mgmt/daemon/CMakeLists.txt new file mode 100644 index 0000000000..f1ce726d85 --- /dev/null +++ b/source/dnode/mgmt/daemon/CMakeLists.txt @@ -0,0 +1,8 @@ +aux_source_directory(src DAEMON_SRC) +add_executable(taosd ${DAEMON_SRC}) +target_link_libraries( + taosd + PUBLIC dnode + PUBLIC util + PUBLIC os +) diff --git a/source/dnode/mgmt/src/dnode.c b/source/dnode/mgmt/daemon/src/daemon.c similarity index 78% rename from source/dnode/mgmt/src/dnode.c rename to source/dnode/mgmt/daemon/src/daemon.c index 1fbeb1e732..720d1589c2 100644 --- a/source/dnode/mgmt/src/dnode.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -14,10 +14,14 @@ */ #define _DEFAULT_SOURCE -#include "dnodeInt.h" +#include "dnode.h" +#include "os.h" +#include "ulog.h" static bool stop = false; + static void sigintHandler(int32_t signum, void *info, void *ctx) { stop = true; } + static void setSignalHandler() { taosSetSignal(SIGTERM, sigintHandler); taosSetSignal(SIGHUP, sigintHandler); @@ -27,20 +31,23 @@ static void setSignalHandler() { } int main(int argc, char const *argv[]) { - setSignalHandler(); + const char *path = "/etc/taos"; - int32_t code = dnodeInit(); - if (code != 0) { - dInfo("Failed to start TDengine, please check the log at:%s", tsLogDir); + SDnode *pDnode = dnodeInit(path); + if (pDnode == NULL) { + uInfo("Failed to start TDengine, please check the log at %s", tsLogDir); exit(EXIT_FAILURE); } + uInfo("Started TDengine service successfully."); + + setSignalHandler(); while (!stop) { taosMsleep(100); } - dInfo("TDengine is shut down!"); - dnodeCleanup(); + uInfo("TDengine is shut down!"); + dnodeCleanup(pDnode); return 0; } diff --git a/source/dnode/mgmt/impl/CMakeLists.txt b/source/dnode/mgmt/impl/CMakeLists.txt new file mode 100644 index 0000000000..b061d75731 --- /dev/null +++ b/source/dnode/mgmt/impl/CMakeLists.txt @@ -0,0 +1,16 @@ +aux_source_directory(src DNODE_SRC) +add_library(dnode ${DNODE_SRC}) +target_link_libraries( + dnode + PUBLIC cjson + PUBLIC mnode + PUBLIC vnode + PUBLIC wal + PUBLIC sync + PUBLIC taos +) +target_include_directories( + dnode + PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mgmt" + private "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) diff --git a/source/dnode/mgmt/inc/dnodeDnode.h b/source/dnode/mgmt/impl/inc/dnodeDnode.h similarity index 87% rename from source/dnode/mgmt/inc/dnodeDnode.h rename to source/dnode/mgmt/impl/inc/dnodeDnode.h index 0d1e93e60f..87dc0fdb9b 100644 --- a/source/dnode/mgmt/inc/dnodeDnode.h +++ b/source/dnode/mgmt/impl/inc/dnodeDnode.h @@ -21,8 +21,8 @@ extern "C" { #endif #include "dnodeInt.h" -int32_t dnodeInitDnode(); -void dnodeCleanupDnode(); +int32_t dnodeInitDnode(SDnode *pDnode); +void dnodeCleanupDnode(SDnode *pDnode); void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dnodeGetDnodeId(); @@ -30,7 +30,7 @@ int64_t dnodeGetClusterId(); void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); void dnodeGetMnodeEpSetForPeer(SEpSet *epSet); void dnodeGetMnodeEpSetForShell(SEpSet *epSet); -void dnodeSendRedirectMsg(SServer *pServer, SRpcMsg *rpcMsg, bool forShell); +void dnodeSendRedirectMsg(SDnode *pDnode, SRpcMsg *rpcMsg, bool forShell); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/inc/dnodeInt.h b/source/dnode/mgmt/impl/inc/dnodeInt.h new file mode 100644 index 0000000000..8944755268 --- /dev/null +++ b/source/dnode/mgmt/impl/inc/dnodeInt.h @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DNODE_INT_H_ +#define _TD_DNODE_INT_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "os.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "thash.h" +#include "tlog.h" +#include "trpc.h" +#include "tthread.h" +#include "ttime.h" + +extern int32_t dDebugFlag; + +#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("SRV FATAL ", 255, __VA_ARGS__); }} +#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("SRV ERROR ", 255, __VA_ARGS__); }} +#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("SRV WARN ", 255, __VA_ARGS__); }} +#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("SRV ", 255, __VA_ARGS__); }} +#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }} +#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }} + +typedef enum { DN_STAT_INIT, DN_STAT_RUNNING, DN_STAT_STOPPED } EStat; +typedef void (*MsgFp)(SRpcMsg *pMsg, SEpSet *pEpSet); + +typedef struct { + char *dnode; + char *mnode; + char *vnodes; +} SDnodeDir; + +typedef struct { + int32_t dnodeId; + int64_t clusterId; + SDnodeEps *dnodeEps; + SHashObj *dnodeHash; + SEpSet mnodeEpSetForShell; + SEpSet mnodeEpSetForPeer; + char *file; + uint32_t rebootTime; + int8_t dropped; + int8_t threadStop; + pthread_t *threadId; + pthread_mutex_t mutex; +} SDnodeDnode; + +typedef struct { +} SDnodeMnode; + +typedef struct { +} SDnodeVnodes; + +typedef struct { + void *peerRpc; + void *shellRpc; + void *clientRpc; +} SDnodeTrans; + +typedef struct SDnode { + EStat stat; + SDnodeDir dir; + SDnodeDnode dnode; + SDnodeVnodes vnodes; + SDnodeMnode mnode; + SDnodeTrans trans; + SStartupMsg startup; +} SDnode; + +EStat dnodeGetStat(SDnode *pDnode); +void dnodeSetStat(SDnode *pDnode, EStat stat); +char *dnodeStatStr(EStat stat); + +void dnodeReportStartup(SDnode *pDnode, char *name, char *desc); +void dnodeGetStartup(SDnode *pDnode, SStartupMsg *pStartup); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DNODE_INT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/inc/dnodeMnode.h b/source/dnode/mgmt/impl/inc/dnodeMnode.h similarity index 100% rename from source/dnode/mgmt/inc/dnodeMnode.h rename to source/dnode/mgmt/impl/inc/dnodeMnode.h diff --git a/source/dnode/mgmt/inc/dnodeTransport.h b/source/dnode/mgmt/impl/inc/dnodeTransport.h similarity index 85% rename from source/dnode/mgmt/inc/dnodeTransport.h rename to source/dnode/mgmt/impl/inc/dnodeTransport.h index 4a9518fe09..7d3f4be1ff 100644 --- a/source/dnode/mgmt/inc/dnodeTransport.h +++ b/source/dnode/mgmt/impl/inc/dnodeTransport.h @@ -23,8 +23,8 @@ extern "C" { int32_t dnodeInitTrans(); void dnodeCleanupTrans(); -void dnodeSendMsgToMnode(SServer *pServer, SRpcMsg *rpcMsg); -void dnodeSendMsgToDnode(SServer *pServer, SEpSet *epSet, SRpcMsg *rpcMsg); +void dnodeSendMsgToMnode(SDnode *pDnode, SRpcMsg *rpcMsg); +void dnodeSendMsgToDnode(SDnode *pDnode, SEpSet *epSet, SRpcMsg *rpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dnodeVnodes.h b/source/dnode/mgmt/impl/inc/dnodeVnodes.h similarity index 100% rename from source/dnode/mgmt/inc/dnodeVnodes.h rename to source/dnode/mgmt/impl/inc/dnodeVnodes.h diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/impl/src/dnodeDnode.c similarity index 65% rename from source/dnode/mgmt/src/dnodeDnode.c rename to source/dnode/mgmt/impl/src/dnodeDnode.c index 8a326c72d5..ec60116ce6 100644 --- a/source/dnode/mgmt/src/dnodeDnode.c +++ b/source/dnode/mgmt/impl/src/dnodeDnode.c @@ -18,67 +18,49 @@ #include "dnodeTransport.h" #include "dnodeVnodes.h" #include "cJSON.h" -#include "thash.h" -#include "tthread.h" -#include "ttime.h" - -static struct { - int32_t dnodeId; - int64_t clusterId; - SDnodeEps *dnodeEps; - SHashObj *dnodeHash; - SEpSet mnodeEpSetForShell; - SEpSet mnodeEpSetForPeer; - char file[PATH_MAX + 20]; - uint32_t rebootTime; - int8_t dropped; - int8_t threadStop; - pthread_t *threadId; - pthread_mutex_t mutex; -} tsDnode = {0}; int32_t dnodeGetDnodeId() { int32_t dnodeId = 0; - pthread_mutex_lock(&tsDnode.mutex); - dnodeId = tsDnode.dnodeId; - pthread_mutex_unlock(&tsDnode.mutex); + pthread_mutex_lock(&pDnode->mutex); + dnodeId = pDnode->dnodeId; + pthread_mutex_unlock(&pDnode->mutex); return dnodeId; } int64_t dnodeGetClusterId() { int64_t clusterId = 0; - pthread_mutex_lock(&tsDnode.mutex); - clusterId = tsDnode.clusterId; - pthread_mutex_unlock(&tsDnode.mutex); + pthread_mutex_lock(&pDnode->mutex); + clusterId = pDnode->clusterId; + pthread_mutex_unlock(&pDnode->mutex); return clusterId; } void dnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) { - pthread_mutex_lock(&tsDnode.mutex); + pthread_mutex_lock(&pDnode->mutex); - SDnodeEp *pEp = taosHashGet(tsDnode.dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *pEp = taosHashGet(pDnode->dnodeHash, &dnodeId, sizeof(int32_t)); if (pEp != NULL) { if (port) *port = pEp->dnodePort; if (fqdn) tstrncpy(fqdn, pEp->dnodeFqdn, TSDB_FQDN_LEN); if (ep) snprintf(ep, TSDB_EP_LEN, "%s:%u", pEp->dnodeFqdn, pEp->dnodePort); } - pthread_mutex_unlock(&tsDnode.mutex); + pthread_mutex_unlock(&pDnode->mutex); } void dnodeGetMnodeEpSetForPeer(SEpSet *pEpSet) { - pthread_mutex_lock(&tsDnode.mutex); - *pEpSet = tsDnode.mnodeEpSetForPeer; - pthread_mutex_unlock(&tsDnode.mutex); + pthread_mutex_lock(&pDnode->mutex); + *pEpSet = pDnode->mnodeEpSetForPeer; + pthread_mutex_unlock(&pDnode->mutex); } void dnodeGetMnodeEpSetForShell(SEpSet *pEpSet) { - pthread_mutex_lock(&tsDnode.mutex); - *pEpSet = tsDnode.mnodeEpSetForShell; - pthread_mutex_unlock(&tsDnode.mutex); + pthread_mutex_lock(&pDnode->mutex); + *pEpSet = pDnode->mnodeEpSetForShell; + pthread_mutex_unlock(&pDnode->mutex); } -void dnodeSendRedirectMsg(SServer *pServer, SRpcMsg *pMsg, bool forShell) { +void dnodeSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg, bool forShell) { int32_t msgType = pMsg->msgType; SEpSet epSet = {0}; @@ -106,7 +88,7 @@ void dnodeSendRedirectMsg(SServer *pServer, SRpcMsg *pMsg, bool forShell) { rpcSendRedirectRsp(pMsg->handle, &epSet); } -static void dnodeUpdateMnodeEpSet(SEpSet *pEpSet) { +static void dnodeUpdateMnodeEpSet(SDnodeDnode *pDnode, SEpSet *pEpSet) { if (pEpSet == NULL || pEpSet->numOfEps <= 0) { dError("mnode is changed, but content is invalid, discard it"); return; @@ -114,22 +96,22 @@ static void dnodeUpdateMnodeEpSet(SEpSet *pEpSet) { dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); } - pthread_mutex_lock(&tsDnode.mutex); + pthread_mutex_lock(&pDnode->mutex); - tsDnode.mnodeEpSetForPeer = *pEpSet; + pDnode->mnodeEpSetForPeer = *pEpSet; for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); } - tsDnode.mnodeEpSetForShell = *pEpSet; + pDnode->mnodeEpSetForShell = *pEpSet; - pthread_mutex_unlock(&tsDnode.mutex); + pthread_mutex_unlock(&pDnode->mutex); } static void dnodePrintDnodes() { - dDebug("print dnode endpoint list, num:%d", tsDnode.dnodeEps->dnodeNum); - for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; i++) { - SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i]; + dDebug("print dnode endpoint list, num:%d", pDnode->dnodeEps->dnodeNum); + for (int32_t i = 0; i < pDnode->dnodeEps->dnodeNum; i++) { + SDnodeEp *ep = &pDnode->dnodeEps->dnodeEps[i]; dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode); } } @@ -138,36 +120,36 @@ static void dnodeResetDnodes(SDnodeEps *pEps) { assert(pEps != NULL); int32_t size = sizeof(SDnodeEps) + pEps->dnodeNum * sizeof(SDnodeEp); - if (pEps->dnodeNum > tsDnode.dnodeEps->dnodeNum) { + if (pEps->dnodeNum > pDnode->dnodeEps->dnodeNum) { SDnodeEps *tmp = calloc(1, size); if (tmp == NULL) return; - tfree(tsDnode.dnodeEps); - tsDnode.dnodeEps = tmp; + tfree(pDnode->dnodeEps); + pDnode->dnodeEps = tmp; } - if (tsDnode.dnodeEps != pEps) { - memcpy(tsDnode.dnodeEps, pEps, size); + if (pDnode->dnodeEps != pEps) { + memcpy(pDnode->dnodeEps, pEps, size); } - tsDnode.mnodeEpSetForPeer.inUse = 0; - tsDnode.mnodeEpSetForShell.inUse = 0; + pDnode->mnodeEpSetForPeer.inUse = 0; + pDnode->mnodeEpSetForShell.inUse = 0; int32_t mIndex = 0; - for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; i++) { - SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i]; + for (int32_t i = 0; i < pDnode->dnodeEps->dnodeNum; i++) { + SDnodeEp *ep = &pDnode->dnodeEps->dnodeEps[i]; if (!ep->isMnode) continue; if (mIndex >= TSDB_MAX_REPLICA) continue; - strcpy(tsDnode.mnodeEpSetForShell.fqdn[mIndex], ep->dnodeFqdn); - strcpy(tsDnode.mnodeEpSetForPeer.fqdn[mIndex], ep->dnodeFqdn); - tsDnode.mnodeEpSetForShell.port[mIndex] = ep->dnodePort; - tsDnode.mnodeEpSetForShell.port[mIndex] = ep->dnodePort + tsDnodeDnodePort; + strcpy(pDnode->mnodeEpSetForShell.fqdn[mIndex], ep->dnodeFqdn); + strcpy(pDnode->mnodeEpSetForPeer.fqdn[mIndex], ep->dnodeFqdn); + pDnode->mnodeEpSetForShell.port[mIndex] = ep->dnodePort; + pDnode->mnodeEpSetForShell.port[mIndex] = ep->dnodePort + tsDnodeDnodePort; mIndex++; } - for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; ++i) { - SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i]; - taosHashPut(tsDnode.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); + for (int32_t i = 0; i < pDnode->dnodeEps->dnodeNum; ++i) { + SDnodeEp *ep = &pDnode->dnodeEps->dnodeEps[i]; + taosHashPut(pDnode->dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); } dnodePrintDnodes(); @@ -175,16 +157,16 @@ static void dnodeResetDnodes(SDnodeEps *pEps) { static bool dnodeIsEpChanged(int32_t dnodeId, char *epStr) { bool changed = false; - pthread_mutex_lock(&tsDnode.mutex); + pthread_mutex_lock(&pDnode->mutex); - SDnodeEp *pEp = taosHashGet(tsDnode.dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *pEp = taosHashGet(pDnode->dnodeHash, &dnodeId, sizeof(int32_t)); if (pEp != NULL) { char epSaved[TSDB_EP_LEN + 1]; snprintf(epSaved, TSDB_EP_LEN, "%s:%u", pEp->dnodeFqdn, pEp->dnodePort); changed = strcmp(epStr, epSaved) != 0; } - pthread_mutex_unlock(&tsDnode.mutex); + pthread_mutex_unlock(&pDnode->mutex); return changed; } @@ -195,101 +177,101 @@ static int32_t dnodeReadDnodes() { cJSON *root = NULL; FILE *fp = NULL; - fp = fopen(tsDnode.file, "r"); + fp = fopen(pDnode->file, "r"); if (!fp) { - dDebug("file %s not exist", tsDnode.file); + dDebug("file %s not exist", pDnode->file); goto PRASE_DNODE_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", tsDnode.file); + dError("failed to read %s since content is null", pDnode->file); goto PRASE_DNODE_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", tsDnode.file); + dError("failed to read %s since invalid json format", pDnode->file); goto PRASE_DNODE_OVER; } cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_String) { - dError("failed to read %s since dnodeId not found", tsDnode.file); + dError("failed to read %s since dnodeId not found", pDnode->file); goto PRASE_DNODE_OVER; } - tsDnode.dnodeId = atoi(dnodeId->valuestring); + pDnode->dnodeId = atoi(dnodeId->valuestring); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { - dError("failed to read %s since clusterId not found", tsDnode.file); + dError("failed to read %s since clusterId not found", pDnode->file); goto PRASE_DNODE_OVER; } - tsDnode.clusterId = atoll(clusterId->valuestring); + pDnode->clusterId = atoll(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_String) { - dError("failed to read %s since dropped not found", tsDnode.file); + dError("failed to read %s since dropped not found", pDnode->file); goto PRASE_DNODE_OVER; } - tsDnode.dropped = atoi(dropped->valuestring); + pDnode->dropped = atoi(dropped->valuestring); cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { - dError("failed to read %s since dnodeInfos not found", tsDnode.file); + dError("failed to read %s since dnodeInfos not found", pDnode->file); goto PRASE_DNODE_OVER; } int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); if (dnodeInfosSize <= 0) { - dError("failed to read %s since dnodeInfos size:%d invalid", tsDnode.file, dnodeInfosSize); + dError("failed to read %s since dnodeInfos size:%d invalid", pDnode->file, dnodeInfosSize); goto PRASE_DNODE_OVER; } - tsDnode.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); - if (tsDnode.dnodeEps == NULL) { + pDnode->dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); + if (pDnode->dnodeEps == NULL) { dError("failed to calloc dnodeEpList since %s", strerror(errno)); goto PRASE_DNODE_OVER; } - tsDnode.dnodeEps->dnodeNum = dnodeInfosSize; + pDnode->dnodeEps->dnodeNum = dnodeInfosSize; for (int32_t i = 0; i < dnodeInfosSize; ++i) { cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); if (dnodeInfo == NULL) break; - SDnodeEp *pEp = &tsDnode.dnodeEps->dnodeEps[i]; + SDnodeEp *pEp = &pDnode->dnodeEps->dnodeEps[i]; cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_String) { - dError("failed to read %s, dnodeId not found", tsDnode.file); + dError("failed to read %s, dnodeId not found", pDnode->file); goto PRASE_DNODE_OVER; } pEp->dnodeId = atoi(dnodeId->valuestring); cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); if (!isMnode || isMnode->type != cJSON_String) { - dError("failed to read %s, isMnode not found", tsDnode.file); + dError("failed to read %s, isMnode not found", pDnode->file); goto PRASE_DNODE_OVER; } pEp->isMnode = atoi(isMnode->valuestring); cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { - dError("failed to read %s, dnodeFqdn not found", tsDnode.file); + dError("failed to read %s, dnodeFqdn not found", pDnode->file); goto PRASE_DNODE_OVER; } tstrncpy(pEp->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); if (!dnodePort || dnodePort->type != cJSON_String) { - dError("failed to read %s, dnodePort not found", tsDnode.file); + dError("failed to read %s, dnodePort not found", pDnode->file); goto PRASE_DNODE_OVER; } pEp->dnodePort = atoi(dnodePort->valuestring); } - dInfo("succcessed to read file %s", tsDnode.file); + dInfo("succcessed to read file %s", pDnode->file); dnodePrintDnodes(); PRASE_DNODE_OVER: @@ -297,28 +279,28 @@ PRASE_DNODE_OVER: if (root != NULL) cJSON_Delete(root); if (fp != NULL) fclose(fp); - if (dnodeIsEpChanged(tsDnode.dnodeId, tsLocalEp)) { - dError("localEp %s different with %s and need reconfigured", tsLocalEp, tsDnode.file); + if (dnodeIsEpChanged(pDnode->dnodeId, tsLocalEp)) { + dError("localEp %s different with %s and need reconfigured", tsLocalEp, pDnode->file); return -1; } - if (tsDnode.dnodeEps == NULL) { - tsDnode.dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); - tsDnode.dnodeEps->dnodeNum = 1; - tsDnode.dnodeEps->dnodeEps[0].dnodePort = tsServerPort; - tstrncpy(tsDnode.dnodeEps->dnodeEps[0].dnodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN); + if (pDnode->dnodeEps == NULL) { + pDnode->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); + pDnode->dnodeEps->dnodeNum = 1; + pDnode->dnodeEps->dnodeEps[0].dnodePort = tsServerPort; + tstrncpy(pDnode->dnodeEps->dnodeEps[0].dnodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN); } - dnodeResetDnodes(tsDnode.dnodeEps); + dnodeResetDnodes(pDnode->dnodeEps); terrno = 0; return 0; } static int32_t dnodeWriteDnodes() { - FILE *fp = fopen(tsDnode.file, "w"); + FILE *fp = fopen(pDnode->file, "w"); if (!fp) { - dError("failed to write %s since %s", tsDnode.file, strerror(errno)); + dError("failed to write %s since %s", pDnode->file, strerror(errno)); return -1; } @@ -327,17 +309,17 @@ static int32_t dnodeWriteDnodes() { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsDnode.dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsDnode.clusterId); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsDnode.dropped); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnode->dnodeId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pDnode->clusterId); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pDnode->dropped); len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); - for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; ++i) { - SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i]; + for (int32_t i = 0; i < pDnode->dnodeEps->dnodeNum; ++i) { + SDnodeEp *ep = &pDnode->dnodeEps->dnodeEps[i]; len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId); len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode); len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn); len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort); - if (i < tsDnode.dnodeEps->dnodeNum - 1) { + if (i < pDnode->dnodeEps->dnodeNum - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { len += snprintf(content + len, maxLen - len, " }]\n"); @@ -351,7 +333,7 @@ static int32_t dnodeWriteDnodes() { free(content); terrno = 0; - dInfo("successed to write %s", tsDnode.file); + dInfo("successed to write %s", pDnode->file); return 0; } @@ -367,7 +349,7 @@ static void dnodeSendStatusMsg() { pStatus->sversion = htonl(tsVersion); pStatus->dnodeId = htonl(dnodeGetDnodeId()); pStatus->clusterId = htobe64(dnodeGetClusterId()); - pStatus->rebootTime = htonl(tsDnode.rebootTime); + pStatus->rebootTime = htonl(pDnode->rebootTime); pStatus->numOfCores = htonl(tsNumOfCores); tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN); @@ -387,37 +369,37 @@ static void dnodeSendStatusMsg() { } static void dnodeUpdateCfg(SDnodeCfg *pCfg) { - if (tsDnode.dnodeId == 0) return; - if (tsDnode.dropped) return; + if (pDnode->dnodeId == 0) return; + if (pDnode->dropped) return; - pthread_mutex_lock(&tsDnode.mutex); + pthread_mutex_lock(&pDnode->mutex); - tsDnode.dnodeId = pCfg->dnodeId; - tsDnode.clusterId = pCfg->clusterId; - tsDnode.dropped = pCfg->dropped; + pDnode->dnodeId = pCfg->dnodeId; + pDnode->clusterId = pCfg->clusterId; + pDnode->dropped = pCfg->dropped; dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, pCfg->dnodeId, pCfg->clusterId); dnodeWriteDnodes(); - pthread_mutex_unlock(&tsDnode.mutex); + pthread_mutex_unlock(&pDnode->mutex); } static void dnodeUpdateDnodeEps(SDnodeEps *pEps) { if (pEps == NULL || pEps->dnodeNum <= 0) return; - pthread_mutex_lock(&tsDnode.mutex); + pthread_mutex_lock(&pDnode->mutex); - if (pEps->dnodeNum != tsDnode.dnodeEps->dnodeNum) { + if (pEps->dnodeNum != pDnode->dnodeEps->dnodeNum) { dnodeResetDnodes(pEps); dnodeWriteDnodes(); } else { int32_t size = pEps->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps); - if (memcmp(tsDnode.dnodeEps, pEps, size) != 0) { + if (memcmp(pDnode->dnodeEps, pEps, size) != 0) { dnodeResetDnodes(pEps); dnodeWriteDnodes(); } } - pthread_mutex_unlock(&tsDnode.mutex); + pthread_mutex_unlock(&pDnode->mutex); } static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { @@ -455,7 +437,7 @@ static void dnodeProcessStartupReq(SRpcMsg *pMsg) { dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); - dnodeGetStartup(pStartup); + dnodeGetStartup(NULL, pStartup); dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); @@ -467,8 +449,8 @@ static void dnodeProcessStartupReq(SRpcMsg *pMsg) { static void *dnodeThreadRoutine(void *param) { int32_t ms = tsStatusInterval * 1000; - while (!tsDnode.threadStop) { - if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { + while (!pDnode->threadStop) { + if (dnodeGetStat() != DN_STAT_RUNNING) { continue; } else { dnodeSendStatusMsg(); @@ -477,31 +459,41 @@ static void *dnodeThreadRoutine(void *param) { } } -int32_t dnodeInitDnode() { - tsDnode.dnodeId = 0; - tsDnode.clusterId = 0; - tsDnode.dnodeEps = NULL; - snprintf(tsDnode.file, sizeof(tsDnode.file), "%s/dnode.json", tsDnodeDir); - tsDnode.rebootTime = taosGetTimestampSec(); - tsDnode.dropped = 0; - pthread_mutex_init(&tsDnode.mutex, NULL); - tsDnode.threadStop = false; +int32_t dnodeInitDnode(SDnode *pServer) { + SDnodeDnode *pDnode = &pServer->dnode; - tsDnode.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (tsDnode.dnodeHash == NULL) { + char path[PATH_MAX]; + snprintf(path, PATH_MAX, "%s/dnode.json", pServer->dir.dnode); + pDnode->file = strdup(path); + if (pDnode->file == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pDnode->dnodeId = 0; + pDnode->clusterId = 0; + pDnode->dnodeEps = NULL; + + pDnode->rebootTime = taosGetTimestampSec(); + pDnode->dropped = 0; + pthread_mutex_init(&pDnode->mutex, NULL); + pDnode->threadStop = false; + + pDnode->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pDnode->dnodeHash == NULL) { dError("failed to init dnode hash"); return TSDB_CODE_DND_OUT_OF_MEMORY; } - tsDnode.threadId = taosCreateThread(dnodeThreadRoutine, NULL); - if (tsDnode.threadId == NULL) { + pDnode->threadId = taosCreateThread(dnodeThreadRoutine, NULL); + if (pDnode->threadId == NULL) { dError("failed to init dnode thread"); return TSDB_CODE_DND_OUT_OF_MEMORY; } int32_t code = dnodeReadDnodes(); if (code != 0) { - dError("failed to read file:%s since %s", tsDnode.file, tstrerror(code)); + dError("failed to read file:%s since %s", pDnode->file, tstrerror(code)); return code; } @@ -509,36 +501,38 @@ int32_t dnodeInitDnode() { return 0; } -void dnodeCleanupDnode() { - if (tsDnode.threadId != NULL) { - tsDnode.threadStop = true; - taosDestoryThread(tsDnode.threadId); - tsDnode.threadId = NULL; +void dnodeCleanupDnode(SDnode *pServer) { + SDnodeDnode *pDnode = &pServer->dnode; + + if (pDnode->threadId != NULL) { + pDnode->threadStop = true; + taosDestoryThread(pDnode->threadId); + pDnode->threadId = NULL; } - pthread_mutex_lock(&tsDnode.mutex); + pthread_mutex_lock(&pDnode->mutex); - if (tsDnode.dnodeEps != NULL) { - free(tsDnode.dnodeEps); - tsDnode.dnodeEps = NULL; + if (pDnode->dnodeEps != NULL) { + free(pDnode->dnodeEps); + pDnode->dnodeEps = NULL; } - if (tsDnode.dnodeHash) { - taosHashCleanup(tsDnode.dnodeHash); - tsDnode.dnodeHash = NULL; + if (pDnode->dnodeHash) { + taosHashCleanup(pDnode->dnodeHash); + pDnode->dnodeHash = NULL; } - pthread_mutex_unlock(&tsDnode.mutex); - pthread_mutex_destroy(&tsDnode.mutex); + pthread_mutex_unlock(&pDnode->mutex); + pthread_mutex_destroy(&pDnode->mutex); dInfo("dnode-dnode is cleaned up"); } -void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { +void dnodeProcessDnodeMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { int32_t msgType = pMsg->msgType; if (msgType == TSDB_MSG_TYPE_STATUS_RSP && pEpSet) { - dnodeUpdateMnodeEpSet(pEpSet); + dnodeUpdateMnodeEpSet(&pDnode->dnode, pEpSet); } switch (msgType) { diff --git a/source/dnode/mgmt/impl/src/dnodeInt.c b/source/dnode/mgmt/impl/src/dnodeInt.c new file mode 100644 index 0000000000..8641a54def --- /dev/null +++ b/source/dnode/mgmt/impl/src/dnodeInt.c @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dnodeDnode.h" +#include "dnodeMnode.h" +#include "dnodeTransport.h" +#include "dnodeVnodes.h" +#include "sync.h" +#include "tcache.h" +#include "tconfig.h" +#include "tnote.h" +#include "tstep.h" +#include "wal.h" + +EStat dnodeGetStat(SDnode *pDnode) { return pDnode->stat; } + +void dnodeSetStat(SDnode *pDnode, EStat stat) { + dDebug("dnode stat set from %s to %s", dnodeStatStr(pDnode->stat), dnodeStatStr(stat)); + pDnode->stat = stat; +} + +char *dnodeStatStr(EStat stat) { + switch (stat) { + case DN_STAT_INIT: + return "init"; + case DN_STAT_RUNNING: + return "running"; + case DN_STAT_STOPPED: + return "stopped"; + default: + return "unknown"; + } +} + +void dnodeReportStartup(SDnode *pDnode, char *name, char *desc) { + SStartupMsg *pStartup = &pDnode->startup; + tstrncpy(pStartup->name, name, strlen(pStartup->name)); + tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); + pStartup->finished = 0; +} + +void dnodeGetStartup(SDnode *pDnode, SStartupMsg *pStartup) { + memcpy(pStartup, &pDnode->startup, sizeof(SStartupMsg); + pStartup->finished = (dnodeGetStat(pDnode) == DN_STAT_RUNNING); +} + +static int32_t dnodeCheckRunning(char *dataDir) { + char filepath[PATH_MAX] = {0}; + snprintf(filepath, sizeof(filepath), "%s/.running", dataDir); + + FileFd fd = taosOpenFileCreateWriteTrunc(filepath); + if (fd < 0) { + dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + int32_t ret = taosLockFile(fd); + if (ret != 0) { + dError("failed to lock file:%s since %s, quit", filepath, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(fd); + return -1; + } + + return 0; +} + +static int32_t dnodeInitDisk(SDnode *pDnode, char *dataDir) { + char path[PATH_MAX]; + snprintf(path, PATH_MAX, "%s/mnode", dataDir); + pDnode->dir.mnode = strdup(path); + + sprintf(path, PATH_MAX, "%s/vnode", dataDir); + pDnode->dir.vnodes = strdup(path); + + sprintf(path, PATH_MAX, "%s/dnode", dataDir); + pDnode->dir.dnode = strdup(path); + + if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) { + dError("failed to malloc dir object"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (!taosMkDir(pDnode->dir.dnode)) { + dError("failed to create dir:%s since %s", pDnode->dir.dnode, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (!taosMkDir(pDnode->dir.mnode)) { + dError("failed to create dir:%s since %s", pDnode->dir.mnode, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (!taosMkDir(pDnode->dir.vnodes)) { + dError("failed to create dir:%s since %s", pDnode->dir.vnodes, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (dnodeCheckRunning(dataDir) != 0) { + return -1; + } + + return 0; +} + +static int32_t dnodeInitEnv(SDnode *pDnode, const char *cfgPath) { + taosIgnSIGPIPE(); + taosBlockSIGPIPE(); + taosResolveCRC(); + taosInitGlobalCfg(); + taosReadGlobalLogCfg(); + taosSetCoreDump(tsEnableCoreFile); + + if (!taosMkDir(tsLogDir)) { + printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); + return -1; + } + + char temp[TSDB_FILENAME_LEN]; + sprintf(temp, "%s/taosdlog", tsLogDir); + if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) { + dError("failed to init log file\n"); + return -1; + } + + if (!taosReadGlobalCfg()) { + taosPrintGlobalCfg(); + dError("TDengine read global config failed"); + return -1; + } + + taosInitNotes(); + + if (taosCheckGlobalCfg() != 0) { + dError("TDengine check global config failed"); + return -1; + } + + if (dnodeInitDisk(pDnode, tsDataDir) != 0) { + dError("TDengine failed to init directory"); + return -1; + } + + return 0; +} + +static void dnodeCleanupEnv(SDnode *pDnode) { + if (pDnode->dir.mnode != NULL) { + tfree(pDnode->dir.mnode); + } + + if (pDnode->dir.vnodes != NULL) { + tfree(pDnode->dir.vnodes); + } + + if (pDnode->dir.dnode != NULL) { + tfree(pDnode->dir.dnode); + } + + taosCloseLog(); + taosStopCacheRefreshWorker(); +} + +SDnode *dnodeInit(const char *cfgPath) { + SDnode *pDnode = calloc(1, sizeof(pDnode)); + if (pDnode == NULL) { + dError("failed to create dnode object"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + dInfo("start to initialize TDengine"); + dnodeSetStat(pDnode, DN_STAT_INIT); + + if (dnodeInitEnv(pDnode, cfgPath) != 0) { + dError("failed to init env"); + dnodeCleanup(pDnode); + return NULL; + } + + if (rpcInit() != 0) { + dError("failed to init rpc env"); + dnodeCleanup(pDnode); + return NULL; + } + + if (walInit() != 0) { + dError("failed to init wal env"); + dnodeCleanup(pDnode); + return NULL; + } + + if (dnodeInitDnode(pDnode) != 0) { + dError("failed to init dnode"); + dnodeCleanup(pDnode); + return NULL; + } + + if (dnodeInitVnodes(pDnode) != 0) { + dError("failed to init vnodes"); + dnodeCleanup(pDnode); + return NULL; + } + + if (dnodeInitMnode(pDnode) != 0) { + dError("failed to init mnode"); + dnodeCleanup(pDnode); + return NULL; + } + + if (dnodeInitTrans(pDnode) != 0) { + dError("failed to init transport"); + dnodeCleanup(pDnode); + return NULL; + } + + dnodeSetStat(pDnode, DN_STAT_RUNNING); + dnodeReportStartup(pDnode, "TDengine", "initialized successfully"); + dInfo("TDengine is initialized successfully"); + + return 0; +} + +void dnodeCleanup(SDnode *pDnode) { + if (dnodeGetStat(pDnode) == DN_STAT_STOPPED) { + dError("dnode is shutting down"); + return; + } + + dInfo("start to cleanup TDengine"); + dnodeSetStat(pDnode, DN_STAT_STOPPED); + dnodeCleanupTrans(pDnode); + dnodeCleanupMnode(pDnode); + dnodeCleanupVnodes(pDnode); + dnodeCleanupDnode(pDnode); + walCleanUp(); + rpcCleanup(); + + dInfo("TDengine is cleaned up successfully"); + dnodeCleanupEnv(pDnode); + free(pDnode); +} diff --git a/source/dnode/mgmt/src/dnodeMnode.c b/source/dnode/mgmt/impl/src/dnodeMnode.c similarity index 96% rename from source/dnode/mgmt/src/dnodeMnode.c rename to source/dnode/mgmt/impl/src/dnodeMnode.c index 232af96897..06b28aeea9 100644 --- a/source/dnode/mgmt/src/dnodeMnode.c +++ b/source/dnode/mgmt/impl/src/dnodeMnode.c @@ -51,19 +51,24 @@ static void dnodeFreeMnodeApplyQueue(); static int32_t dnodeAllocMnodeSyncQueue(); static void dnodeFreeMnodeSyncQueue(); -static int32_t dnodeAcquireMnode() { +static SMnode *dnodeAcquireMnode() { + SMnode *pMnode = NULL; taosRLockLatch(&tsMnode.latch); - int32_t code = tsMnode.deployed ? 0 : TSDB_CODE_DND_MNODE_NOT_DEPLOYED; - if (code == 0) { + if (tsMnode.deployed) { atomic_add_fetch_32(&tsMnode.refCount, 1); + pMnode = tsMnode.pMnode; } taosRUnLockLatch(&tsMnode.latch); - return code; + return pMnode; } -static void dnodeReleaseMnode() { atomic_sub_fetch_32(&tsMnode.refCount, 1); } +static void dnodeReleaseMnode(SMnode *pMnode) { + taosRLockLatch(&tsMnode.latch); + atomic_sub_fetch_32(&tsMnode.refCount, 1); + taosRUnLockLatch(&tsMnode.latch); +} static int32_t dnodeReadMnodeFile() { int32_t code = TSDB_CODE_DND_READ_MNODE_FILE_ERROR; @@ -503,12 +508,12 @@ static void dnodeCleanupMnodeSyncWorker() { tWorkerCleanup(&tsMnode.syncPool); } static int32_t dnodeInitMnodeModule() { taosInitRWLatch(&tsMnode.latch); - SMnodePara para; + SMnodeOptions para; para.dnodeId = dnodeGetDnodeId(); para.clusterId = dnodeGetClusterId(); para.sendMsgToDnodeFp = dnodeSendMsgToDnode; para.sendMsgToMnodeFp = dnodeSendMsgToMnode; - para.sendMsgToMnodeFp = dnodeSendRedirectMsg; + para.sendRedirectMsgFp = dnodeSendRedirectMsg; tsMnode.pMnode = mnodeCreate(para); if (tsMnode.pMnode != NULL) { @@ -517,7 +522,7 @@ static int32_t dnodeInitMnodeModule() { return 0; } -static void dnodeCleanupMnodeModule() { mnodeCleanup(); } +static void dnodeCleanupMnodeModule() { mnodeDrop(NULL); } static bool dnodeNeedDeployMnode() { if (dnodeGetDnodeId() > 0) return false; @@ -590,13 +595,14 @@ void dnodeCleanupMnode() { } int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey) { - int32_t code = dnodeAcquireMnode(); - if (code != 0) { + SMnode *pMnode = dnodeAcquireMnode(); + if (pMnode == NULL) { dTrace("failed to get user auth since mnode not deployed"); - return code; + terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; + return -1; } - code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); - dnodeReleaseMnode(); + int32_t code = mnodeRetriveAuth(pMnode, user, spi, encrypt, secret, ckey); + dnodeReleaseMnode(pMnode); return code; } \ No newline at end of file diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/impl/src/dnodeTransport.c similarity index 97% rename from source/dnode/mgmt/src/dnodeTransport.c rename to source/dnode/mgmt/impl/src/dnodeTransport.c index b3263aadca..c1e8955625 100644 --- a/source/dnode/mgmt/src/dnodeTransport.c +++ b/source/dnode/mgmt/impl/src/dnodeTransport.c @@ -135,7 +135,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SEpSet *pEpSet) { return; } - if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { + if (dnodeGetStat() != DN_STAT_RUNNING) { rspMsg.code = TSDB_CODE_APP_NOT_READY; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); @@ -193,7 +193,7 @@ static void dnodeCleanupPeerServer() { static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SEpSet *pEpSet) { int32_t msgType = pMsg->msgType; - if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { + if (dnodeGetStat() == DN_STAT_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]); rpcFreeCont(pMsg->pCont); @@ -248,13 +248,13 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SEpSet *pEpSet) { SRpcMsg rspMsg = {.handle = pMsg->handle}; int32_t msgType = pMsg->msgType; - if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { + if (dnodeGetStat() == DN_STAT_STOPPED) { dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]); rspMsg.code = TSDB_CODE_DND_EXITING; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); return; - } else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { + } else if (dnodeGetStat() != DN_STAT_RUNNING) { dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]); rspMsg.code = TSDB_CODE_APP_NOT_READY; rpcSendResponse(&rspMsg); @@ -382,13 +382,13 @@ void dnodeCleanupTrans() { dnodeCleanupClient(); } -void dnodeSendMsgToDnode(SServer *pServer, SEpSet *epSet, SRpcMsg *rpcMsg) { +void dnodeSendMsgToDnode(SDnode *pDnode, SEpSet *epSet, SRpcMsg *rpcMsg) { #if 0 rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); #endif } -void dnodeSendMsgToMnode(SServer *pServer, SRpcMsg *rpcMsg) { +void dnodeSendMsgToMnode(SDnode *pDnode, SRpcMsg *rpcMsg) { SEpSet epSet = {0}; dnodeGetMnodeEpSetForPeer(&epSet); dnodeSendMsgToDnode(NULL, &epSet, rpcMsg); diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/impl/src/dnodeVnodes.c similarity index 99% rename from source/dnode/mgmt/src/dnodeVnodes.c rename to source/dnode/mgmt/impl/src/dnodeVnodes.c index bd15850c42..7eaa82ba93 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/impl/src/dnodeVnodes.c @@ -815,7 +815,7 @@ void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { } } -static int32_t dnodePutMsgIntoVnodeApplyQueue(SServer *pServer, int32_t vgId, SVnodeMsg *pMsg) { +static int32_t dnodePutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg) { SVnodeObj *pVnode = dnodeAcquireVnode(vgId); if (pVnode == NULL) { return terrno; diff --git a/source/dnode/mgmt/inc/dnodeInt.h b/source/dnode/mgmt/inc/dnodeInt.h deleted file mode 100644 index 48da1ee558..0000000000 --- a/source/dnode/mgmt/inc/dnodeInt.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DNODE_INT_H_ -#define _TD_DNODE_INT_H_ - -#ifdef __cplusplus -extern "C" { -#endif -#include "os.h" -#include "taosmsg.h" -#include "tglobal.h" -#include "tlog.h" -#include "trpc.h" - -extern int32_t dDebugFlag; - -#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }} -#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }} -#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }} -#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", 255, __VA_ARGS__); }} -#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} -#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} - -typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat; -typedef void (*MsgFp)(SRpcMsg *pMsg, SEpSet *pEpSet); - -typedef struct SServer { -} SServer; - -int32_t dnodeInit(); -void dnodeCleanup(); - -EDnStat dnodeGetRunStat(); -void dnodeSetRunStat(EDnStat stat); - -void dnodeReportStartup(char *name, char *desc); -void dnodeReportStartupFinished(char *name, char *desc); -void dnodeGetStartup(SStartupMsg *); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DNODE_INT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/src/dnodeInt.c b/source/dnode/mgmt/src/dnodeInt.c deleted file mode 100644 index eee4bac050..0000000000 --- a/source/dnode/mgmt/src/dnodeInt.c +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dnodeDnode.h" -#include "dnodeMnode.h" -#include "dnodeTransport.h" -#include "dnodeVnodes.h" -#include "sync.h" -#include "tcache.h" -#include "tconfig.h" -#include "tnote.h" -#include "tstep.h" -#include "wal.h" - -static struct { - SStartupMsg startup; - EDnStat runStat; - SSteps *steps; -} tsInt; - -EDnStat dnodeGetRunStat() { return tsInt.runStat; } - -void dnodeSetRunStat(EDnStat stat) { - dDebug("runstat set to %d", stat); - tsInt.runStat = stat; -} - -void dnodeReportStartup(char *name, char *desc) { - SStartupMsg *pStartup = &tsInt.startup; - tstrncpy(pStartup->name, name, strlen(pStartup->name)); - tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); - pStartup->finished = 0; -} - -void dnodeReportStartupFinished(char *name, char *desc) { - SStartupMsg *pStartup = &tsInt.startup; - tstrncpy(pStartup->name, name, strlen(pStartup->name)); - tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); - pStartup->finished = 1; -} - -void dnodeGetStartup(SStartupMsg *pStartup) { memcpy(pStartup, &tsInt.startup, sizeof(SStartupMsg)); } - -static int32_t dnodeCheckRunning(char *dir) { - char filepath[256] = {0}; - snprintf(filepath, sizeof(filepath), "%s/.running", dir); - - FileFd fd = taosOpenFileCreateWriteTrunc(filepath); - if (fd < 0) { - dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno)); - return -1; - } - - int32_t ret = taosLockFile(fd); - if (ret != 0) { - dError("failed to lock file:%s since %s, quit", filepath, strerror(errno)); - taosCloseFile(fd); - return -1; - } - - return 0; -} - -static int32_t dnodeInitDir() { - sprintf(tsMnodeDir, "%s/mnode", tsDataDir); - sprintf(tsVnodeDir, "%s/vnode", tsDataDir); - sprintf(tsDnodeDir, "%s/dnode", tsDataDir); - - if (!taosMkDir(tsDnodeDir)) { - dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno)); - return -1; - } - - if (!taosMkDir(tsMnodeDir)) { - dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno)); - return -1; - } - - if (!taosMkDir(tsVnodeDir)) { - dError("failed to create dir:%s since %s", tsVnodeDir, strerror(errno)); - return -1; - } - - if (dnodeCheckRunning(tsDnodeDir) != 0) { - return -1; - } - - return 0; -} - -static int32_t dnodeInitMain() { - tscEmbedded = 1; - taosIgnSIGPIPE(); - taosBlockSIGPIPE(); - taosResolveCRC(); - taosInitGlobalCfg(); - taosReadGlobalLogCfg(); - taosSetCoreDump(tsEnableCoreFile); - - if (!taosMkDir(tsLogDir)) { - printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); - return -1; - } - - char temp[TSDB_FILENAME_LEN]; - sprintf(temp, "%s/taosdlog", tsLogDir); - if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) { - printf("failed to init log file\n"); - } - - if (!taosReadGlobalCfg()) { - taosPrintGlobalCfg(); - dError("TDengine read global config failed"); - return -1; - } - - dInfo("start to initialize TDengine"); - - taosInitNotes(); - - if (taosCheckGlobalCfg() != 0) { - return -1; - } - - dnodeInitDir(); - - return 0; -} - -static void dnodeCleanupMain() { - taos_cleanup(); - taosCloseLog(); - taosStopCacheRefreshWorker(); -} - -int32_t dnodeInit() { - SSteps *steps = taosStepInit(10, dnodeReportStartup); - if (steps == NULL) return -1; -#if 1 - dnodeSetRunStat(DN_RUN_STAT_RUNNING); -#endif - taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain); - taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup); - taosStepAdd(steps, "dnode-tfs", NULL, NULL); - taosStepAdd(steps, "dnode-wal", walInit, walCleanUp); - //taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp); - taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode); - taosStepAdd(steps, "dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes); - taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, dnodeCleanupMnode); - taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans); - - tsInt.steps = steps; - taosStepExec(tsInt.steps); - - dnodeSetRunStat(DN_RUN_STAT_RUNNING); - dnodeReportStartupFinished("TDengine", "initialized successfully"); - dInfo("TDengine is initialized successfully"); - - return 0; -} - -void dnodeCleanup() { - if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) { - dnodeSetRunStat(DN_RUN_STAT_STOPPED); - taosStepCleanup(tsInt.steps); - tsInt.steps = NULL; - } -} diff --git a/source/dnode/mnode/impl/inc/mnodeInt.h b/source/dnode/mnode/impl/inc/mnodeInt.h index b0005acc20..43af281f27 100644 --- a/source/dnode/mnode/impl/inc/mnodeInt.h +++ b/source/dnode/mnode/impl/inc/mnodeInt.h @@ -32,21 +32,22 @@ typedef struct SMnodeBak { tmr_h timer; SSteps *pInitSteps; SSteps *pStartSteps; - SMnodePara para; + SMnodeOptions para; MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX]; } SMnodeBak; typedef struct SMnode { - int32_t dnodeId; - int64_t clusterId; - tmr_h timer; - SSteps *pInitSteps; - SSteps *pStartSteps; - SMnodePara para; - MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX]; - + int32_t dnodeId; + int64_t clusterId; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; + tmr_h timer; + SSteps *pInitSteps; + SSteps *pStartSteps; struct SSdb *pSdb; - struct SServer *pServer; + struct SDnode *pServer; + MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX]; PutMsgToMnodeQFp putMsgToApplyMsgFp; SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 9ea4ebe0e6..bc4718ee5b 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -77,40 +77,20 @@ static void mnodeCleanupTimer() { tmr_h mnodeGetTimer() { return tsMint.timer; } -static int32_t mnodeSetPara(SMnode *pMnode, SMnodePara para) { - pMnode->dnodeId = para.dnodeId; - pMnode->clusterId = para.clusterId; - pMnode->putMsgToApplyMsgFp = para.putMsgToApplyMsgFp; - pMnode->sendMsgToDnodeFp = para.sendMsgToDnodeFp; - pMnode->sendMsgToMnodeFp = para.sendMsgToMnodeFp; - pMnode->sendRedirectMsgFp = para.sendRedirectMsgFp; +static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOptions *pOptions) { + pMnode->dnodeId = pOptions->dnodeId; + pMnode->clusterId = pOptions->clusterId; + pMnode->replica = pOptions->replica; + pMnode->selfIndex = pOptions->selfIndex; + memcpy(&pMnode->replicas, pOptions->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); + pMnode->pServer = pOptions->pServer; + pMnode->putMsgToApplyMsgFp = pOptions->putMsgToApplyMsgFp; + pMnode->sendMsgToDnodeFp = pOptions->sendMsgToDnodeFp; + pMnode->sendMsgToMnodeFp = pOptions->sendMsgToMnodeFp; + pMnode->sendRedirectMsgFp = pOptions->sendRedirectMsgFp; - if (pMnode->sendMsgToDnodeFp == NULL) { - terrno = TSDB_CODE_MND_APP_ERROR; - return -1; - } - - if (pMnode->sendMsgToMnodeFp == NULL) { - terrno = TSDB_CODE_MND_APP_ERROR; - return -1; - } - - if (pMnode->sendRedirectMsgFp == NULL) { - terrno = TSDB_CODE_MND_APP_ERROR; - return -1; - } - - if (pMnode->putMsgToApplyMsgFp == NULL) { - terrno = TSDB_CODE_MND_APP_ERROR; - return -1; - } - - if (pMnode->dnodeId < 0) { - terrno = TSDB_CODE_MND_APP_ERROR; - return -1; - } - - if (pMnode->clusterId < 0) { + if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || + pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { terrno = TSDB_CODE_MND_APP_ERROR; return -1; } @@ -156,12 +136,12 @@ static int32_t mnodeAllocStartSteps() { return 0; } -SMnode *mnodeCreate(SMnodePara para) { +SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) { SMnode *pMnode = calloc(1, sizeof(SMnode)); - if (mnodeSetPara(pMnode, para) != 0) { + if (mnodeSetOptions(pMnode, pOptions) != 0) { free(pMnode); - mError("failed to init mnode para since %s", terrstr()); + mError("failed to init mnode options since %s", terrstr()); return NULL; } @@ -175,35 +155,31 @@ SMnode *mnodeCreate(SMnodePara para) { return NULL; } - taosStepExec(tsMint.pInitSteps); - return NULL; -} + taosStepExec(tsMint.pInitSteps); -void mnodeCleanup() { taosStepCleanup(tsMint.pInitSteps); } - -int32_t mnodeDeploy(SMnodeCfg *pCfg) { if (tsMint.para.dnodeId <= 0 && tsMint.para.clusterId <= 0) { if (sdbDeploy() != 0) { mError("failed to deploy sdb since %s", terrstr()); - return -1; + return NULL; + } else { + mInfo("mnode is deployed"); } } - mDebug("mnode is deployed"); - return 0; + taosStepExec(tsMint.pStartSteps); + + return pMnode; } -void mnodeUnDeploy() { sdbUnDeploy(); } +void mnodeClose(SMnode *pMnode) { free(pMnode); } -int32_t mnodeStart(SMnodeCfg *pCfg) { return taosStepExec(tsMint.pStartSteps); } +int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions) { return 0; } -int32_t mnodeAlter(SMnodeCfg *pCfg) { return 0; } +void mnodeDestroy(const char *path) { sdbUnDeploy(); } -void mnodeStop() { taosStepCleanup(tsMint.pStartSteps); } +int32_t mnodeGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { return 0; } -int32_t mnodeGetLoad(SMnodeLoad *pLoad) { return 0; } - -SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) { +SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg)); if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -211,7 +187,7 @@ SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) { } if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) { - mnodeCleanupMsg(pMsg); + mnodeCleanupMsg(pMnode, pMsg); mError("can not get user from conn:%p", pMsg->rpcMsg.handle); terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; return NULL; @@ -223,7 +199,7 @@ SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) { return pMsg; } -void mnodeCleanupMsg(SMnodeMsg *pMsg) { +void mnodeCleanupMsg(SMnode *pMnode, SMnodeMsg *pMsg) { if (pMsg->pUser != NULL) { sdbRelease(pMsg->pUser); } @@ -232,6 +208,12 @@ void mnodeCleanupMsg(SMnodeMsg *pMsg) { } static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) { + if (!mnodeIsMaster()) { + mnodeSendRedirectMsg(NULL, &pMsg->rpcMsg, true); + mnodeCleanupMsg(NULL, pMsg); + return; + } + int32_t msgType = pMsg->rpcMsg.msgType; MnodeRpcFp fp = tsMint.msgFp[msgType]; @@ -250,25 +232,13 @@ void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) { } } -void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType) { - if (!mnodeIsMaster()) { - mnodeSendRedirectMsg(NULL, &pMsg->rpcMsg, true); - mnodeCleanupMsg(pMsg); - return; - } +void mnodeProcessReadMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } - switch (msgType) { - case MN_MSG_TYPE_READ: - case MN_MSG_TYPE_WRITE: - case MN_MSG_TYPE_SYNC: - mnodeProcessRpcMsg(pMsg); - break; - case MN_MSG_TYPE_APPLY: - break; - default: - break; - } -} +void mnodeProcessWriteMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } + +void mnodeProcessSyncMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } + +void mnodeProcessApplyMsg(SMnode *pMnode, SMnodeMsg *pMsg) {} #if 0 diff --git a/source/dnode/mnode/impl/src/mnodeAuth.c b/source/dnode/mnode/impl/src/mnodeAuth.c index bb3289ebeb..ddd2b91ff3 100644 --- a/source/dnode/mnode/impl/src/mnodeAuth.c +++ b/source/dnode/mnode/impl/src/mnodeAuth.c @@ -20,7 +20,7 @@ int32_t mnodeInitAuth() { return 0; } void mnodeCleanupAuth() {} -int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { +int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { if (strcmp(user, TSDB_NETTEST_USER) == 0) { char pass[32] = {0}; taosEncryptPass((uint8_t *)user, strlen(user), pass); diff --git a/source/dnode/mnode/impl/src/mnodeTelem.c b/source/dnode/mnode/impl/src/mnodeTelem.c index ef1ac10eb6..206b94a6c7 100644 --- a/source/dnode/mnode/impl/src/mnodeTelem.c +++ b/source/dnode/mnode/impl/src/mnodeTelem.c @@ -174,7 +174,7 @@ static void mnodeAddVersionInfo(SBufferWriter* bw) { static void mnodeAddRuntimeInfo(SBufferWriter* bw) { SMnodeLoad load = {0}; - if (mnodeGetLoad(&load) != 0) { + if (mnodeGetLoad(NULL, &load) != 0) { return; }