diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index 86fd7104df..a62cd06410 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -48,17 +48,17 @@ typedef struct { char secondEp[TSDB_EP_LEN]; SDiskCfg *pDisks; int32_t numOfDisks; -} SDndCfg; +} SDnodeOpt; typedef enum { DND_EVENT_STOP = 1, DND_EVENT_RELOAD } EDndEvent; /** * @brief Initialize and start the dnode. * - * @param pCfg Config of the dnode. + * @param pOption Option of the dnode. * @return SDnode* The dnode object. */ -SDnode *dndCreate(SDndCfg *pCfg); +SDnode *dndCreate(const SDnodeOpt *pOption); /** * @brief Stop and cleanup the dnode. @@ -80,7 +80,7 @@ int32_t dndRun(SDnode *pDnode); * @param pDnode The dnode object to close. * @param event The event to handle. */ -void dndeHandleEvent(SDnode *pDnode, EDndEvent event); +void dndHandleEvent(SDnode *pDnode, EDndEvent event); #ifdef __cplusplus } diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 1b6938b7c5..150f260841 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -32,17 +32,16 @@ typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef struct { - int32_t dnodeId; - int64_t clusterId; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; - SDnode *pDnode; - PutReqToMWriteQFp putReqToMWriteQFp; - PutReqToMReadQFp putReqToMReadQFp; - SendReqToDnodeFp sendReqFp; - SendReqToMnodeFp sendReqToMnodeFp; - SendRedirectRspFp sendRedirectRspFp; + int32_t dnodeId; + int64_t clusterId; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; + SDnode *pDnode; + PutToQueueFp putReqToMWriteQFp; + PutToQueueFp putReqToMReadQFp; + SendReqFp sendReqFp; + SendMnodeReqFp sendReqToMnodeFp; } SMnodeOpt; /* ------------------------ SMnode ------------------------ */ diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index efea80a788..1b41da33bb 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -244,7 +244,7 @@ void tfsClosedir(STfsDir *pDir); * @param pTfs The fs object. * @param pInfo The info object. */ -void tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo); +int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo); #ifdef __cplusplus } diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 9c8c7a519f..4de2284dc3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -53,6 +53,10 @@ typedef struct { void *pNode; } SNodeMsg; +typedef int32_t (*PutToQueueFp)(void *pMgmt, struct SRpcMsg *pReq); +typedef int32_t (*SendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendMnodeReqFp)(void *pMgmt, struct SRpcMsg *rpcMsg); +typedef int32_t (*SendRspFp)(void *pMgmt, struct SRpcMsg *rpcMsg); typedef struct SRpcInit { uint16_t localPort; // local port char * label; // for debug purpose diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index abf8ee2e8a..d19b7f5387 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -40,8 +40,8 @@ int32_t dndInitBnode(SDnode *pDnode); void dndCleanupBnode(SDnode *pDnode); void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg); +int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/inc/bmMsg.h b/source/dnode/mgmt/bnode/inc/bmMsg.h index ee7021579f..3444b19fc5 100644 --- a/source/dnode/mgmt/bnode/inc/bmMsg.h +++ b/source/dnode/mgmt/bnode/inc/bmMsg.h @@ -23,8 +23,6 @@ extern "C" { #endif void bmInitMsgHandles(SMgmtWrapper *pWrapper); -int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/src/bmMgmt.c b/source/dnode/mgmt/bnode/src/bmMgmt.c index 90b6d06564..9f76f3b6a0 100644 --- a/source/dnode/mgmt/bnode/src/bmMgmt.c +++ b/source/dnode/mgmt/bnode/src/bmMgmt.c @@ -180,8 +180,8 @@ static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendRedirectRspFp = dmSendRedirectRsp; - pOption->dnodeId = dmGetDnodeId(pDnode); - pOption->clusterId = dmGetClusterId(pDnode); + pOption->dnodeId = pDnode->dnodeId; + pOption->clusterId = pDnode->clusterId; pOption->sver = tsVersion; } @@ -268,7 +268,7 @@ int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - if (createReq.dnodeId != dmGetDnodeId(pDnode)) { + if (createReq.dnodeId != pDnode->dnodeId) { terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION; dError("failed to create bnode since %s", terrstr()); return -1; @@ -284,7 +284,7 @@ int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - if (dropReq.dnodeId != dmGetDnodeId(pDnode)) { + if (dropReq.dnodeId != pDnode->dnodeId) { terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION; dError("failed to drop bnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/bnode/src/bmMsg.c b/source/dnode/mgmt/bnode/src/bmMsg.c index 5cf1901656..15ca5e9cc5 100644 --- a/source/dnode/mgmt/bnode/src/bmMsg.c +++ b/source/dnode/mgmt/bnode/src/bmMsg.c @@ -17,8 +17,8 @@ #include "bmMsg.h" #include "bmWorker.h" -int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} -int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} +int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg) {return 0;} +int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg) {return 0;} void bmInitMsgHandles(SMgmtWrapper *pWrapper) { diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index cb66823c5e..66dbad8f88 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -43,7 +43,6 @@ #include "snode.h" #include "tfs.h" #include "vnode.h" -#include "monitor.h" #ifdef __cplusplus extern "C" { @@ -57,10 +56,10 @@ extern "C" { #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType; -typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; +typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus; typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; -typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStat; +typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; typedef struct SMgmtFp SMgmtFp; typedef struct SMgmtWrapper SMgmtWrapper; @@ -121,11 +120,22 @@ typedef struct { } STransMgmt; typedef struct SDnode { + int64_t clusterId; + int32_t dnodeId; + int32_t numOfSupportVnodes; int64_t rebootTime; + char *localEp; + char *localFqdn; + char *firstEp; + char *secondEp; + char *dataDir; + SDiskCfg *pDisks; + int32_t numOfDisks; + uint16_t serverPort; + bool dropped; EDndStatus status; EDndEvent event; EProcType procType; - SDndCfg cfg; SStartupReq startup; TdFilePtr pLockFile; STransMgmt trans; @@ -149,16 +159,16 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); void dndSendMonitorReport(SDnode *pDnode); // dndNode.h -SDnode *dndCreate(SDndCfg *pCfg); +SDnode *dndCreate(const SDnodeOpt *pOption); void dndClose(SDnode *pDnode); int32_t dndRun(SDnode *pDnode); -void dndeHandleEvent(SDnode *pDnode, EDndEvent event); -void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndHandleEvent(SDnode *pDnode, EDndEvent event); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); +void dndSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); // dndTransport.h -int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); +int32_t dndSendReqToMnode(void *pWrapper, SRpcMsg *pMsg); +int32_t dndSendReqToDnode(void *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); // dndWorker.h int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, diff --git a/source/dnode/mgmt/container/inc/dndNode.h b/source/dnode/mgmt/container/inc/dndNode.h index 0fbae69d89..a32541bf77 100644 --- a/source/dnode/mgmt/container/inc/dndNode.h +++ b/source/dnode/mgmt/container/inc/dndNode.h @@ -22,10 +22,10 @@ extern "C" { #endif -SDnode *dndCreate(SDndCfg *pCfg); +SDnode *dndCreate(const SDnodeOpt *pOption); void dndClose(SDnode *pDnode); int32_t dndRun(SDnode *pDnode); -void dndeHandleEvent(SDnode *pDnode, EDndEvent event); +void dndHandleEvent(SDnode *pDnode, EDndEvent event); void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); diff --git a/source/dnode/mgmt/container/inc/dndTransport.h b/source/dnode/mgmt/container/inc/dndTransport.h index 49297abd6c..b6998c674e 100644 --- a/source/dnode/mgmt/container/inc/dndTransport.h +++ b/source/dnode/mgmt/container/inc/dndTransport.h @@ -28,8 +28,8 @@ int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode); -int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); +int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pMsg); +int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/container/src/dndMonitor.c b/source/dnode/mgmt/container/src/dndMonitor.c index a25c1a299b..e92fa80745 100644 --- a/source/dnode/mgmt/container/src/dndMonitor.c +++ b/source/dnode/mgmt/container/src/dndMonitor.c @@ -25,16 +25,14 @@ static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) { tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); pInfo->tempdir.size = tsTempSpace.size; - vmGetTfsMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo); - - return 0; + return vmGetTfsMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo); } static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { - pInfo->dnode_id = dmGetDnodeId(pDnode); - tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN); - pInfo->cluster_id = dmGetClusterId(pDnode); pInfo->protocol = 1; + pInfo->dnode_id = pDnode->dnodeId; + pInfo->cluster_id = pDnode->clusterId; + tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN); } static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { @@ -56,7 +54,7 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { void dndSendMonitorReport(SDnode *pDnode) { if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; - dTrace("pDnode:%p, send monitor report to %s:%u", pDnode, tsMonitorFqdn, tsMonitorPort); + dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort); SMonInfo *pMonitor = monCreateMonitorInfo(); if (pMonitor == NULL) return; diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index 8a807b538d..39f5da9675 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -48,6 +48,7 @@ static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openF static void dndCloseNode(SMgmtWrapper *pWrapper) { if (pWrapper->required) { (*pWrapper->fp.closeFp)(pWrapper); + pWrapper->required = false; } if (pWrapper->pProc) { taosProcCleanup(pWrapper->pProc); @@ -55,6 +56,26 @@ static void dndCloseNode(SMgmtWrapper *pWrapper) { } } +static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) { + pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes; + pDnode->serverPort = pOption->serverPort; + pDnode->dataDir = strdup(pOption->dataDir); + pDnode->localEp = strdup(pOption->localEp); + pDnode->localFqdn = strdup(pOption->localFqdn); + pDnode->firstEp = strdup(pOption->firstEp); + pDnode->secondEp = strdup(pOption->secondEp); + pDnode->pDisks = pOption->pDisks; + pDnode->numOfDisks = pOption->numOfDisks; + pDnode->rebootTime = taosGetTimestampMs(); + + if (pDnode->dataDir == NULL || pDnode->dataDir == NULL || pDnode->dataDir == NULL || pDnode->dataDir == NULL || + pDnode->dataDir == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; +} + static void dndClearMemory(SDnode *pDnode) { for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; @@ -65,11 +86,16 @@ static void dndClearMemory(SDnode *pDnode) { taosCloseFile(&pDnode->pLockFile); pDnode->pLockFile = NULL; } - tfree(pDnode); + tfree(pDnode->localEp); + tfree(pDnode->localFqdn); + tfree(pDnode->firstEp); + tfree(pDnode->secondEp); + tfree(pDnode->dataDir); + free(pDnode); dDebug("dnode object memory is cleared, data:%p", pDnode); } -SDnode *dndCreate(SDndCfg *pCfg) { +SDnode *dndCreate(const SDnodeOpt *pOption) { dInfo("start to create dnode object"); int32_t code = -1; char path[PATH_MAX + 100]; @@ -81,10 +107,12 @@ SDnode *dndCreate(SDndCfg *pCfg) { goto _OVER; } - memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg)); + if (dndInitMemory(pDnode, pOption) != 0) { + goto _OVER; + } + dndSetStatus(pDnode, DND_STAT_INIT); - pDnode->rebootTime = taosGetTimestampMs(); - pDnode->pLockFile = dndCheckRunning(pCfg->dataDir); + pDnode->pLockFile = dndCheckRunning(pDnode->dataDir); if (pDnode->pLockFile == NULL) { goto _OVER; } @@ -112,10 +140,10 @@ SDnode *dndCreate(SDndCfg *pCfg) { for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - snprintf(path, sizeof(path), "%s%s%s", pCfg->dataDir, TD_DIRSEP, pDnode->wrappers[n].name); + snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); pWrapper->path = strdup(path); pWrapper->pDnode = pDnode; - if (pDnode->wrappers[n].path == NULL) { + if (pWrapper->path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } @@ -201,7 +229,7 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) { - dmSendRedirectRsp(pWrapper->pDnode, pRsp); + dmSendRedirectRsp(dndGetWrapper(pWrapper->pDnode, DNODE), pRsp); } else { rpcSendResponse(pRsp); } @@ -222,6 +250,11 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { } } +void dndSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { + pRsp->code = TSDB_CODE_APP_NOT_READY; + dndSendRsp(pWrapper, pRsp); +} + static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { dTrace("msg:%p, get from child queue", pMsg); SRpcMsg *pRpc = &pMsg->rpcMsg; @@ -354,7 +387,7 @@ int32_t dndRun(SDnode *pDnode) { return 0; } -void dndeHandleEvent(SDnode *pDnode, EDndEvent event) { +void dndHandleEvent(SDnode *pDnode, EDndEvent event) { dInfo("dnode object receive event %d, data:%p", event, pDnode); pDnode->event = event; } @@ -375,7 +408,7 @@ static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { - dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet); + dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE), pEpSet); } int32_t code = -1; diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 002ce73460..0621468c8b 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dndTransport.h" +#include "dndNode.h" #include "dmInt.h" #include "mmInt.h" @@ -133,7 +134,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp STransMgmt *pMgmt = &pDnode->trans; SEpSet epSet = {0}; - dmGetMnodeEpSet(pDnode, &epSet); + dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet); rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp); } @@ -216,7 +217,7 @@ int32_t dndInitServer(SDnode *pDnode) { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = pDnode->cfg.serverPort; + rpcInit.localPort = pDnode->serverPort; rpcInit.label = "DND"; rpcInit.numOfThreads = numOfThreads; rpcInit.cfp = dndProcessRequest; @@ -271,8 +272,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { return 0; } -int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) { - STransMgmt *pMgmt = &pDnode->trans; +static int32_t dndSetReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { if (pMgmt->clientRpc == NULL) { terrno = TSDB_CODE_DND_OFFLINE; return -1; @@ -282,8 +282,18 @@ int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) { return 0; } -int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pReq) { - SEpSet epSet = {0}; - dmGetMnodeEpSet(pDnode, &epSet); - return dndSendReqToDnode(pDnode, &epSet, pReq); +int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) { + SMgmtWrapper *pWrapper = wrapper; + STransMgmt *pTrans = &pWrapper->pDnode->trans; + return dndSetReq(pTrans, pEpSet, pReq); +} + +int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pReq) { + SMgmtWrapper *pWrapper = wrapper; + SDnode *pDnode = pWrapper->pDnode; + STransMgmt *pTrans = &pDnode->trans; + + SEpSet epSet = {0}; + dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet); + return dndSetReq(pTrans, &epSet, pReq); } diff --git a/source/dnode/mgmt/container/src/dndWorker.c b/source/dnode/mgmt/container/src/dndWorker.c index 35bcb4f0a7..c06c86db39 100644 --- a/source/dnode/mgmt/container/src/dndWorker.c +++ b/source/dnode/mgmt/container/src/dndWorker.c @@ -82,7 +82,7 @@ void dndCleanupWorker(SDnodeWorker *pWorker) { } int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) { - if (pWorker == NULL || pWorker->queue == NULL) { + if (pWorker == NULL || pWorker->queue == NULL ) { terrno = TSDB_CODE_INVALID_PARA; return -1; } diff --git a/source/dnode/mgmt/dnode/inc/dmMsg.h b/source/dnode/mgmt/dnode/inc/dm.h similarity index 66% rename from source/dnode/mgmt/dnode/inc/dmMsg.h rename to source/dnode/mgmt/dnode/inc/dm.h index 036471f17f..0336ae91d1 100644 --- a/source/dnode/mgmt/dnode/inc/dmMsg.h +++ b/source/dnode/mgmt/dnode/inc/dm.h @@ -13,24 +13,24 @@ * along with this program. If not, see . */ -#ifndef _TD_DND_DNODE_MSG_H_ -#define _TD_DND_DNODE_MSG_H_ +#ifndef _TD_DND_DNODE_H_ +#define _TD_DND_DNODE_H_ -#include "dmInt.h" +#include "dndInt.h" #ifdef __cplusplus extern "C" { #endif +void dmGetMgmtFp(SMgmtWrapper *pWrapper); +void dmGetMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet); +void dmUpdateMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet); +void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); void dmInitMsgHandles(SMgmtWrapper *pWrapper); -void dmSendStatusReq(SDnodeMgmt *pMgmt); -int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq); -void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp); -void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp); -void dmProcessGrantRsp(SDnode *pDnode, SRpcMsg *pRsp); +int32_t dmStart(SMgmtWrapper *pWrapper); #ifdef __cplusplus } #endif -#endif /*_TD_DND_DNODE_MSG_H_*/ \ No newline at end of file +#endif /*_TD_DND_DNODE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/inc/dmFile.h b/source/dnode/mgmt/dnode/inc/dmFile.h deleted file mode 100644 index b1eebcd4e7..0000000000 --- a/source/dnode/mgmt/dnode/inc/dmFile.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DND_DNODE_FILE_H_ -#define _TD_DND_DNODE_FILE_H_ - -#include "dmInt.h" - -#ifdef __cplusplus -extern "C" { -#endif - -int32_t dmReadFile(SDnodeMgmt *pMgmt); -int32_t dmWriteFile(SDnodeMgmt *pMgmt); -void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DND_DNODE_FILE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index 78ac278682..6adfdd545b 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -16,24 +16,19 @@ #ifndef _TD_DND_DNODE_INT_H_ #define _TD_DND_DNODE_INT_H_ -#include "dndInt.h" +#include "dm.h" #ifdef __cplusplus extern "C" { #endif typedef struct SDnodeMgmt { - int32_t dnodeId; - int32_t dropped; - int64_t clusterId; - char localEp[TSDB_EP_LEN]; - char firstEp[TSDB_EP_LEN]; int64_t dver; int64_t updateTime; int8_t statusSent; SEpSet mnodeEpSet; SHashObj *dnodeHash; - SArray *pDnodeEps; + SArray *dnodeEps; pthread_t *threadId; SRWLatch latch; SDnodeWorker mgmtWorker; @@ -42,17 +37,24 @@ typedef struct SDnodeMgmt { SDnode *pDnode; } SDnodeMgmt; -// dmInt.h -void dmGetMgmtFp(SMgmtWrapper *pWrapper); -int32_t dmGetDnodeId(SDnode *pDnode); -int64_t dmGetClusterId(SDnode *pDnode); -void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); -void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); -void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); -void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); +// dmFile.c +int32_t dmReadFile(SDnodeMgmt *pMgmt); +int32_t dmWriteFile(SDnodeMgmt *pMgmt); +void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps); -// dmWorker.h +// dmInt.c + +// dmMsg.c +void dmSendStatusReq(SDnodeMgmt *pMgmt); +int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); + +// dmWorker.c int32_t dmStartWorker(SDnodeMgmt *pMgmt); +void dmStopWorker(SDnodeMgmt *pMgmt); +int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/inc/dmWorker.h b/source/dnode/mgmt/dnode/inc/dmWorker.h deleted file mode 100644 index bdd994dfaf..0000000000 --- a/source/dnode/mgmt/dnode/inc/dmWorker.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DND_DNODE_WORKER_H_ -#define _TD_DND_DNODE_WORKER_H_ - -#include "dmInt.h" - -#ifdef __cplusplus -extern "C" { -#endif - -int32_t dmStartWorker(SDnodeMgmt *pMgmt); -void dmStopWorker(SDnodeMgmt *pMgmt); -int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DND_DNODE_WORKER_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/src/dmFile.c b/source/dnode/mgmt/dnode/src/dmFile.c index 03fc47ae27..82f8f539aa 100644 --- a/source/dnode/mgmt/dnode/src/dmFile.c +++ b/source/dnode/mgmt/dnode/src/dmFile.c @@ -14,11 +14,11 @@ */ #define _DEFAULT_SOURCE -#include "dmFile.h" +#include "dmInt.h" static void dmPrintDnodes(SDnodeMgmt *pMgmt); -static bool dmIsEpChanged(SDnodeMgmt *pMgmt, const char *ep); -static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps); +static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep); +static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *dnodeEps); int32_t dmReadFile(SDnodeMgmt *pMgmt) { int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; @@ -28,9 +28,10 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) { cJSON *root = NULL; char file[PATH_MAX]; TdFilePtr pFile = NULL; + SDnode *pDnode = pMgmt->pDnode; - pMgmt->pDnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); - if (pMgmt->pDnodeEps == NULL) { + pMgmt->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); + if (pMgmt->dnodeEps == NULL) { dError("failed to calloc dnodeEp array since %s", strerror(errno)); goto PRASE_DNODE_OVER; } @@ -61,21 +62,21 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) { dError("failed to read %s since dnodeId not found", file); goto PRASE_DNODE_OVER; } - pMgmt->dnodeId = dnodeId->valueint; + pDnode->dnodeId = dnodeId->valueint; cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { dError("failed to read %s since clusterId not found", file); goto PRASE_DNODE_OVER; } - pMgmt->clusterId = atoll(clusterId->valuestring); + pDnode->clusterId = atoll(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", file); goto PRASE_DNODE_OVER; } - pMgmt->dropped = dropped->valueint; + pDnode->dropped = dropped->valueint; cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); if (!dnodes || dnodes->type != cJSON_Array) { @@ -125,7 +126,7 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) { } dnodeEp.isMnode = isMnode->valueint; - taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); + taosArrayPush(pMgmt->dnodeEps, &dnodeEp); } code = 0; @@ -137,25 +138,27 @@ PRASE_DNODE_OVER: if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); - if (dmIsEpChanged(pMgmt, pMgmt->pDnode->cfg.localEp)) { - dError("localEp %s different with %s and need reconfigured", pMgmt->pDnode->cfg.localEp, file); + if (dmIsEpChanged(pMgmt, pDnode->dnodeId, pDnode->localEp)) { + dError("localEp %s different with %s and need reconfigured", pDnode->localEp, file); return -1; } - if (taosArrayGetSize(pMgmt->pDnodeEps) == 0) { + if (taosArrayGetSize(pMgmt->dnodeEps) == 0) { SDnodeEp dnodeEp = {0}; dnodeEp.isMnode = 1; - taosGetFqdnPortFromEp(pMgmt->pDnode->cfg.firstEp, &dnodeEp.ep); - taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); + taosGetFqdnPortFromEp(pDnode->firstEp, &dnodeEp.ep); + taosArrayPush(pMgmt->dnodeEps, &dnodeEp); } - dmResetDnodes(pMgmt, pMgmt->pDnodeEps); + dmResetDnodes(pMgmt, pMgmt->dnodeEps); terrno = code; return code; } int32_t dmWriteFile(SDnodeMgmt *pMgmt) { + SDnode *pDnode = pMgmt->pDnode; + char file[PATH_MAX]; snprintf(file, sizeof(file), "%s%sdnode.json.bak", pMgmt->path, TD_DIRSEP); @@ -171,14 +174,14 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pDnode->dnodeId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pDnode->clusterId); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pDnode->dropped); len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); - int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); + int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->dnodeEps); for (int32_t i = 0; i < numOfEps; ++i) { - SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->pDnodeEps, i); + SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->dnodeEps, i); len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn); len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port); @@ -210,20 +213,20 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) { return 0; } -void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { - int32_t numOfEps = taosArrayGetSize(pDnodeEps); +void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) { + int32_t numOfEps = taosArrayGetSize(dnodeEps); if (numOfEps <= 0) return; taosWLockLatch(&pMgmt->latch); - int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); + int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->dnodeEps); if (numOfEps != numOfEpsOld) { - dmResetDnodes(pMgmt, pDnodeEps); + dmResetDnodes(pMgmt, dnodeEps); dmWriteFile(pMgmt); } else { int32_t size = numOfEps * sizeof(SDnodeEp); - if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) { - dmResetDnodes(pMgmt, pDnodeEps); + if (memcmp(pMgmt->dnodeEps->pData, dnodeEps->pData, size) != 0) { + dmResetDnodes(pMgmt, dnodeEps); dmWriteFile(pMgmt); } } @@ -231,10 +234,10 @@ void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { taosWUnLockLatch(&pMgmt->latch); } -static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { - if (pMgmt->pDnodeEps != pDnodeEps) { - SArray *tmp = pMgmt->pDnodeEps; - pMgmt->pDnodeEps = taosArrayDup(pDnodeEps); +static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *dnodeEps) { + if (pMgmt->dnodeEps != dnodeEps) { + SArray *tmp = pMgmt->dnodeEps; + pMgmt->dnodeEps = taosArrayDup(dnodeEps); taosArrayDestroy(tmp); } @@ -242,10 +245,10 @@ static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { pMgmt->mnodeEpSet.numOfEps = 0; int32_t mIndex = 0; - int32_t numOfEps = (int32_t)taosArrayGetSize(pDnodeEps); + int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps); for (int32_t i = 0; i < numOfEps; i++) { - SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i); + SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i); if (!pDnodeEp->isMnode) continue; if (mIndex >= TSDB_MAX_REPLICA) continue; pMgmt->mnodeEpSet.numOfEps++; @@ -255,7 +258,7 @@ static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { } for (int32_t i = 0; i < numOfEps; i++) { - SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i); + SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i); taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); } @@ -263,19 +266,19 @@ static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { } static void dmPrintDnodes(SDnodeMgmt *pMgmt) { - int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); + int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->dnodeEps); dDebug("print dnode ep list, num:%d", numOfEps); for (int32_t i = 0; i < numOfEps; i++) { - SDnodeEp *pEp = taosArrayGet(pMgmt->pDnodeEps, i); + SDnodeEp *pEp = taosArrayGet(pMgmt->dnodeEps, i); dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode); } } -static bool dmIsEpChanged(SDnodeMgmt *pMgmt, const char *ep) { +static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep) { bool changed = false; taosRLockLatch(&pMgmt->latch); - SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &pMgmt->dnodeId, sizeof(int32_t)); + SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { char epstr[TSDB_EP_LEN + 1]; snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); diff --git a/source/dnode/mgmt/dnode/src/dmInt.c b/source/dnode/mgmt/dnode/src/dmInt.c index 4486d8a19d..61ca6ac78c 100644 --- a/source/dnode/mgmt/dnode/src/dmInt.c +++ b/source/dnode/mgmt/dnode/src/dmInt.c @@ -15,45 +15,18 @@ #define _DEFAULT_SOURCE #include "dmInt.h" -#include "dmFile.h" -#include "dmMsg.h" -#include "dmWorker.h" - -int32_t dmGetDnodeId(SDnode *pDnode) { - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); - SDnodeMgmt *pMgmt = pWrapper->pMgmt; - - taosRLockLatch(&pMgmt->latch); - int32_t dnodeId = pMgmt->dnodeId; - taosRUnLockLatch(&pMgmt->latch); - return dnodeId; -} - -int64_t dmGetClusterId(SDnode *pDnode) { - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); - SDnodeMgmt *pMgmt = pWrapper->pMgmt; - - taosRLockLatch(&pMgmt->latch); - int64_t clusterId = pMgmt->clusterId; - taosRUnLockLatch(&pMgmt->latch); - return clusterId; -} - -void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); - SDnodeMgmt *pMgmt = pWrapper->pMgmt; +void dmGetMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet) { + SDnodeMgmt *pMgmt = pWrapper->pMgmt; taosRLockLatch(&pMgmt->latch); *pEpSet = pMgmt->mnodeEpSet; taosRUnLockLatch(&pMgmt->latch); } -void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { +void dmUpdateMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet) { dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); - SDnodeMgmt *pMgmt = pWrapper->pMgmt; - + SDnodeMgmt *pMgmt = pWrapper->pMgmt; taosWLockLatch(&pMgmt->latch); pMgmt->mnodeEpSet = *pEpSet; @@ -64,10 +37,8 @@ void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { taosWUnLockLatch(&pMgmt->latch); } -void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); - SDnodeMgmt *pMgmt = pWrapper->pMgmt; - +void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { + SDnodeMgmt *pMgmt = pWrapper->pMgmt; taosRLockLatch(&pMgmt->latch); SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); @@ -86,16 +57,17 @@ void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint1 taosRUnLockLatch(&pMgmt->latch); } -void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) { - tmsg_t msgType = pReq->msgType; +void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { + SDnode *pDnode = pWrapper->pDnode; + tmsg_t msgType = pReq->msgType; SEpSet epSet = {0}; - dmGetMnodeEpSet(pDnode, &epSet); + dmGetMnodeEpSet(pWrapper, &epSet); dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); - if (strcmp(epSet.eps[i].fqdn, pDnode->cfg.localFqdn) == 0 && epSet.eps[i].port == pDnode->cfg.serverPort) { + if (strcmp(epSet.eps[i].fqdn, pDnode->localFqdn) == 0 && epSet.eps[i].port == pDnode->serverPort) { epSet.inUse = (i + 1) % epSet.numOfEps; } @@ -105,18 +77,21 @@ void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) { rpcSendRedirectRsp(pReq->handle, &epSet); } +int32_t dmStart(SMgmtWrapper *pWrapper) { + SDnodeMgmt *pMgmt = pWrapper->pMgmt; + return dmStartWorker(pMgmt); +} + int32_t dmInit(SMgmtWrapper *pWrapper) { SDnode *pDnode = pWrapper->pDnode; SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt)); dInfo("dnode-mgmt is initialized"); - pMgmt->dnodeId = 0; - pMgmt->dropped = 0; - pMgmt->clusterId = 0; + pDnode->dnodeId = 0; + pDnode->dropped = 0; + pDnode->clusterId = 0; pMgmt->path = pWrapper->path; pMgmt->pDnode = pDnode; - memcpy(pMgmt->localEp, pDnode->cfg.localEp, TSDB_EP_LEN); - memcpy(pMgmt->firstEp, pDnode->cfg.firstEp, TSDB_EP_LEN); taosInitRWLatch(&pMgmt->latch); pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -131,7 +106,7 @@ int32_t dmInit(SMgmtWrapper *pWrapper) { return -1; } - if (pMgmt->dropped) { + if (pDnode->dropped) { dError("dnode will not start since its already dropped"); return -1; } @@ -150,9 +125,9 @@ void dmCleanup(SMgmtWrapper *pWrapper) { taosWLockLatch(&pMgmt->latch); - if (pMgmt->pDnodeEps != NULL) { - taosArrayDestroy(pMgmt->pDnodeEps); - pMgmt->pDnodeEps = NULL; + if (pMgmt->dnodeEps != NULL) { + taosArrayDestroy(pMgmt->dnodeEps); + pMgmt->dnodeEps = NULL; } if (pMgmt->dnodeHash != NULL) { diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index 931c0eb5a4..4a8ca025ab 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -14,9 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "dmMsg.h" -#include "dmFile.h" -#include "dmWorker.h" +#include "dmInt.h" #include "vmInt.h" void dmSendStatusReq(SDnodeMgmt *pMgmt) { @@ -26,13 +24,13 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { taosRLockLatch(&pMgmt->latch); req.sver = tsVersion; req.dver = pMgmt->dver; - req.dnodeId = pMgmt->dnodeId; - req.clusterId = pMgmt->clusterId; + req.dnodeId = pDnode->dnodeId; + req.clusterId = pDnode->clusterId; req.rebootTime = pDnode->rebootTime; req.updateTime = pMgmt->updateTime; req.numOfCores = tsNumOfCores; - req.numOfSupportVnodes = pDnode->cfg.numOfSupportVnodes; - memcpy(req.dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN); + req.numOfSupportVnodes = pDnode->numOfSupportVnodes; + memcpy(req.dnodeEp, pDnode->localEp, TSDB_EP_LEN); req.clusterCfg.statusInterval = tsStatusInterval; req.clusterCfg.checkTime = 0; @@ -59,23 +57,26 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { } static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { - if (pMgmt->dnodeId == 0) { + SDnode *pDnode = pMgmt->pDnode; + + if (pDnode->dnodeId == 0) { dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); taosWLockLatch(&pMgmt->latch); - pMgmt->dnodeId = pCfg->dnodeId; - pMgmt->clusterId = pCfg->clusterId; + pDnode->dnodeId = pCfg->dnodeId; + pDnode->clusterId = pCfg->clusterId; dmWriteFile(pMgmt); taosWUnLockLatch(&pMgmt->latch); } } -void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { - SDnodeMgmt *pMgmt = dndGetWrapper(pDnode, DNODE)->pMgmt; +int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SDnode *pDnode = pMgmt->pDnode; + SRpcMsg *pRsp = &pMsg->rpcMsg; if (pRsp->code != TSDB_CODE_SUCCESS) { - if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { - dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId); - pMgmt->dropped = 1; + if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pDnode->dropped && pDnode->dnodeId > 0) { + dInfo("dnode:%d, set to dropped since not exist in mnode", pDnode->dnodeId); + pDnode->dropped = 1; dmWriteFile(pMgmt); } } else { @@ -92,13 +93,22 @@ void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { pMgmt->statusSent = 0; } -void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("auth rsp is received, but not supported yet"); } +int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SRpcMsg *pRsp = &pMsg->rpcMsg; + dError("auth rsp is received, but not supported yet"); + return 0; +} -void dmProcessGrantRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("grant rsp is received, but not supported yet"); } +int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SRpcMsg *pRsp = &pMsg->rpcMsg; + dError("grant rsp is received, but not supported yet"); + return 0; +} -int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq) { - dError("config req is received, but not supported yet"); +int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SRpcMsg *pReq = &pMsg->rpcMsg; SDCfgDnodeReq *pCfg = pReq->pCont; + dError("config req is received, but not supported yet"); return TSDB_CODE_OPS_NOT_SUPPORT; } diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index d39d5fb15b..de57d1714c 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -14,8 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "dmWorker.h" -#include "dmMsg.h" +#include "dmInt.h" #include "bmInt.h" #include "mmInt.h" @@ -34,7 +33,7 @@ static void *dmThreadRoutine(void *param) { while (true) { pthread_testcancel(); taosMsleep(200); - if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) { + if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pDnode->dropped) { continue; } @@ -54,68 +53,68 @@ static void *dmThreadRoutine(void *param) { } } -static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) { - int32_t code = 0; - SRpcMsg *pMsg = &pNodeMsg->rpcMsg; - dTrace("msg:%p, will be processed", pNodeMsg); +static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pMsg) { + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; + dTrace("msg:%p, will be processed", pMsg); - switch (pMsg->msgType) { + switch (msgType) { case TDMT_DND_CREATE_MNODE: - code = mmProcessCreateReq(pDnode, pMsg); + code = mmProcessCreateReq(dndGetWrapper(pDnode, MNODE)->pMgmt, pMsg); break; case TDMT_DND_ALTER_MNODE: - code = mmProcessAlterReq(pDnode, pMsg); + code = mmProcessAlterReq(dndGetWrapper(pDnode, MNODE)->pMgmt, pMsg); break; case TDMT_DND_DROP_MNODE: - code = mmProcessDropReq(pDnode, pMsg); + code = mmProcessDropReq(dndGetWrapper(pDnode, MNODE)->pMgmt, pMsg); break; case TDMT_DND_CREATE_QNODE: - code = qmProcessCreateReq(pDnode, pMsg); + code = qmProcessCreateReq(dndGetWrapper(pDnode, QNODE)->pMgmt, pMsg); break; case TDMT_DND_DROP_QNODE: - code = qmProcessDropReq(pDnode, pMsg); + code = qmProcessDropReq(dndGetWrapper(pDnode, QNODE)->pMgmt, pMsg); break; case TDMT_DND_CREATE_SNODE: - code = smProcessCreateReq(pDnode, pMsg); + code = smProcessCreateReq(dndGetWrapper(pDnode, SNODE)->pMgmt, pMsg); break; case TDMT_DND_DROP_SNODE: - code = smProcessDropReq(pDnode, pMsg); + code = smProcessDropReq(dndGetWrapper(pDnode, SNODE)->pMgmt, pMsg); break; case TDMT_DND_CREATE_BNODE: - code = bmProcessCreateReq(pDnode, pMsg); + code = bmProcessCreateReq(dndGetWrapper(pDnode, BNODE)->pMgmt, pMsg); break; case TDMT_DND_DROP_BNODE: - code = bmProcessDropReq(pDnode, pMsg); + code = bmProcessDropReq(dndGetWrapper(pDnode, BNODE)->pMgmt, pMsg); break; case TDMT_DND_CONFIG_DNODE: - code = dmProcessConfigReq(pDnode, pMsg); + code = dmProcessConfigReq(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); break; case TDMT_MND_STATUS_RSP: - dmProcessStatusRsp(pDnode, pMsg); + code = dmProcessStatusRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); break; case TDMT_MND_AUTH_RSP: - dmProcessAuthRsp(pDnode, pMsg); + code = dmProcessAuthRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); break; case TDMT_MND_GRANT_RSP: - dmProcessGrantRsp(pDnode, pMsg); + code = dmProcessGrantRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; code = -1; - dError("RPC %p, dnode msg:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); + dError("RPC %p, dnode msg:%s not processed", pMsg->rpcMsg.handle, TMSG_INFO(msgType)); break; } - if (pMsg->msgType & 1u) { + if (msgType & 1u) { if (code != 0) code = terrno; - SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle}; + SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle}; rpcSendResponse(&rsp); } - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - taosFreeQitem(pNodeMsg); - dTrace("msg:%p, is freed", pNodeMsg); + rpcFreeCont(pMsg->rpcMsg.pCont); + pMsg->rpcMsg.pCont = NULL; + taosFreeQitem(pMsg); + dTrace("msg:%p, is freed", pMsg); } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/main/inc/dndMain.h b/source/dnode/mgmt/main/inc/dndMain.h index 72aedbe338..1958d628a0 100644 --- a/source/dnode/mgmt/main/inc/dndMain.h +++ b/source/dnode/mgmt/main/inc/dndMain.h @@ -36,10 +36,10 @@ extern "C" { #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} -void dndDumpCfg(); -void dndPrintVersion(); -void dndGenerateGrant(); -SDndCfg dndGetCfg(); +void dndDumpCfg(); +void dndPrintVersion(); +void dndGenerateGrant(); +SDnodeOpt dndGetOpt(); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/main/src/dndMain.c b/source/dnode/mgmt/main/src/dndMain.c index 370e9aa7b5..11ce8971f6 100644 --- a/source/dnode/mgmt/main/src/dndMain.c +++ b/source/dnode/mgmt/main/src/dndMain.c @@ -28,7 +28,7 @@ static struct { static void dndSigintHandle(int signum, void *info, void *ctx) { dInfo("singal:%d is received", signum); - dndeHandleEvent(global.pDnode, DND_EVENT_STOP); + dndHandleEvent(global.pDnode, DND_EVENT_STOP); } static void dndSetSignalHandle() { @@ -71,8 +71,9 @@ static int32_t dndRunDnode() { return -1; } - SDndCfg objCfg = dndGetCfg(); - SDnode *pDnode = dndCreate(&objCfg); + SDnodeOpt option = dndGetOpt(); + + SDnode *pDnode = dndCreate(&option); if (pDnode == NULL) { dError("failed to to create dnode object since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/main/src/dndUtil.c b/source/dnode/mgmt/main/src/dndUtil.c index 163aad6694..e07ef68c77 100644 --- a/source/dnode/mgmt/main/src/dndUtil.c +++ b/source/dnode/mgmt/main/src/dndUtil.c @@ -38,18 +38,18 @@ void dndDumpCfg() { cfgDumpCfg(pCfg, 0, 1); } -SDndCfg dndGetCfg() { - SConfig *pCfg = taosGetCfg(); - SDndCfg objCfg = {0}; +SDnodeOpt dndGetOpt() { + SConfig *pCfg = taosGetCfg(); + SDnodeOpt option = {0}; - objCfg.numOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32; - tstrncpy(objCfg.dataDir, tsDataDir, sizeof(objCfg.dataDir)); - tstrncpy(objCfg.firstEp, tsFirst, sizeof(objCfg.firstEp)); - tstrncpy(objCfg.secondEp, tsSecond, sizeof(objCfg.firstEp)); - objCfg.serverPort = tsServerPort; - tstrncpy(objCfg.localFqdn, tsLocalFqdn, sizeof(objCfg.localFqdn)); - snprintf(objCfg.localEp, sizeof(objCfg.localEp), "%s:%u", objCfg.localFqdn, objCfg.serverPort); - objCfg.pDisks = tsDiskCfg; - objCfg.numOfDisks = tsDiskCfgNum; - return objCfg; + option.numOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32; + tstrncpy(option.dataDir, tsDataDir, sizeof(option.dataDir)); + tstrncpy(option.firstEp, tsFirst, sizeof(option.firstEp)); + tstrncpy(option.secondEp, tsSecond, sizeof(option.firstEp)); + option.serverPort = tsServerPort; + tstrncpy(option.localFqdn, tsLocalFqdn, sizeof(option.localFqdn)); + snprintf(option.localEp, sizeof(option.localEp), "%s:%u", option.localFqdn, option.serverPort); + option.pDisks = tsDiskCfg; + option.numOfDisks = tsDiskCfgNum; + return option; } diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 70269b79bb..7ceec45ae0 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -54,9 +54,9 @@ int32_t mmDrop(SMnodeMgmt *pMgmt); int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate); // mmHandle.h -int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mnode/inc/mmMsg.h b/source/dnode/mgmt/mnode/inc/mmMsg.h index f07c44705b..bf577dad10 100644 --- a/source/dnode/mgmt/mnode/inc/mmMsg.h +++ b/source/dnode/mgmt/mnode/inc/mmMsg.h @@ -24,9 +24,9 @@ extern "C" { void mmInitMsgHandles(SMgmtWrapper *pWrapper); -int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +// int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +// int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +// int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mnode/inc/mmWorker.h b/source/dnode/mgmt/mnode/inc/mmWorker.h index 45a737b9de..91c1b30ff8 100644 --- a/source/dnode/mgmt/mnode/inc/mmWorker.h +++ b/source/dnode/mgmt/mnode/inc/mmWorker.h @@ -28,8 +28,8 @@ int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmPutMsgToWriteQueue(void *wrapper, SRpcMsg *pRpcMsg); +int32_t mmPutMsgToReadQueue(void *wrapper, SRpcMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index 36f4b40fa8..a33f570a19 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -142,11 +142,10 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { pOption->pDnode = pDnode; pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqToMnodeFp = dndSendReqToMnode; - pOption->sendRedirectRspFp = dmSendRedirectRsp; pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue; pOption->putReqToMReadQFp = mmPutMsgToReadQueue; - pOption->dnodeId = dmGetDnodeId(pDnode); - pOption->clusterId = dmGetClusterId(pDnode); + pOption->dnodeId = pDnode->dnodeId; + pOption->clusterId = pDnode->clusterId; } static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { @@ -157,8 +156,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { pOption->selfIndex = 0; SReplica *pReplica = &pOption->replicas[0]; pReplica->id = 1; - pReplica->port = pDnode->cfg.serverPort; - memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN); + pReplica->port = pDnode->serverPort; + memcpy(pReplica->fqdn, pDnode->localFqdn, TSDB_FQDN_LEN); pMgmt->selfIndex = pOption->selfIndex; pMgmt->replica = pOption->replica; @@ -176,8 +175,8 @@ int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnod SDnode *pDnode = pMgmt->pDnode; mmInitOption(pMgmt, pOption); - pOption->dnodeId = dmGetDnodeId(pDnode); - pOption->clusterId = dmGetClusterId(pDnode); + pOption->dnodeId = pDnode->dnodeId; + pOption->clusterId = pDnode->clusterId; pOption->replica = pCreate->replica; pOption->selfIndex = -1; @@ -259,15 +258,15 @@ _OVER: } static bool mmDeployRequired(SDnode *pDnode) { - if (dmGetDnodeId(pDnode) > 0) { + if (pDnode->dnodeId > 0) { return false; } - if (dmGetClusterId(pDnode) > 0) { + if (pDnode->clusterId > 0) { return false; } - if (strcmp(pDnode->cfg.localEp, pDnode->cfg.firstEp) != 0) { + if (strcmp(pDnode->localEp, pDnode->firstEp) != 0) { return false; } diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index dccd49fe9d..ef8585d61a 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -18,9 +18,9 @@ #include "dmInt.h" #include "mmWorker.h" -int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SDnode *pDnode = pMgmt->pDnode; + SRpcMsg *pReq = &pMsg->rpcMsg; SDCreateMnodeReq createReq = {0}; if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { @@ -28,7 +28,7 @@ int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - if (createReq.replica <= 1 || createReq.dnodeId != dmGetDnodeId(pDnode)) { + if (createReq.replica <= 1 || createReq.dnodeId != pDnode->dnodeId) { terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; dError("failed to create mnode since %s", terrstr()); return -1; @@ -53,9 +53,9 @@ int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { return mmOpen(pMgmt, &option); } -int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) { - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SDnode *pDnode = pMgmt->pDnode; + SRpcMsg *pReq = &pMsg->rpcMsg; SDAlterMnodeReq alterReq = {0}; if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { @@ -63,7 +63,7 @@ int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - if (alterReq.dnodeId != dmGetDnodeId(pDnode)) { + if (alterReq.dnodeId != pDnode->dnodeId) { terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; dError("failed to alter mnode since %s", terrstr()); return -1; @@ -90,9 +90,9 @@ int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) { return code; } -int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SDnode *pDnode = pMgmt->pDnode; + SRpcMsg *pReq = &pMsg->rpcMsg; SDDropMnodeReq dropReq = {0}; if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { @@ -100,7 +100,7 @@ int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - if (dropReq.dnodeId != dmGetDnodeId(pDnode)) { + if (dropReq.dnodeId != pDnode->dnodeId) { terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; dError("failed to drop mnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 52cfc98682..3a195d9301 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -125,14 +125,16 @@ static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker return code; } -int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpc) { - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - return mmPutRpcMsgToWorker(pWrapper, &pMgmt->writeWorker, pRpc); +int32_t mmPutMsgToWriteQueue(void *wrapper, SRpcMsg *pRpc) { + // SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); + // SMnodeMgmt *pMgmt = pWrapper->pMgmt; + // return mmPutRpcMsgToWorker(pWrapper, &pMgmt->writeWorker, pRpc); + return 0; } -int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpc) { - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - return mmPutRpcMsgToWorker(pWrapper, &pMgmt->readWorker, pRpc); +int32_t mmPutMsgToReadQueue(void *wrapper, SRpcMsg *pRpc) { + // SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); + // SMnodeMgmt *pMgmt = pWrapper->pMgmt; + // return mmPutRpcMsgToWorker(pWrapper, &pMgmt->readWorker, pRpc); + return 0; } diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index aa31eade55..2eccdd8dcf 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -45,8 +45,8 @@ void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); // qmHandle.h -int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/qnode/inc/qmMsg.h b/source/dnode/mgmt/qnode/inc/qmMsg.h index 6b86f3eee8..ef8e88722c 100644 --- a/source/dnode/mgmt/qnode/inc/qmMsg.h +++ b/source/dnode/mgmt/qnode/inc/qmMsg.h @@ -23,8 +23,8 @@ extern "C" { #endif void qmInitMsgHandles(SMgmtWrapper *pWrapper); -int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/qnode/src/qmMgmt.c b/source/dnode/mgmt/qnode/src/qmMgmt.c index c29704582a..76746d0f77 100644 --- a/source/dnode/mgmt/qnode/src/qmMgmt.c +++ b/source/dnode/mgmt/qnode/src/qmMgmt.c @@ -185,9 +185,9 @@ static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { pOption->pDnode = pDnode; pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqToMnodeFp = dndSendReqToMnode; - pOption->sendRedirectRspFp = dmSendRedirectRsp; - pOption->dnodeId = dmGetDnodeId(pDnode); - pOption->clusterId = dmGetClusterId(pDnode); + pOption->sendRedirectRspFp = dndSendRedirectRsp; + pOption->dnodeId = pDnode->dnodeId; + pOption->clusterId = pDnode->clusterId; pOption->sver = tsVersion; } @@ -274,7 +274,7 @@ int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - if (createReq.dnodeId != dmGetDnodeId(pDnode)) { + if (createReq.dnodeId != pDnode->dnodeId) { terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION; dError("failed to create qnode since %s", terrstr()); return -1; @@ -290,7 +290,7 @@ int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - if (dropReq.dnodeId != dmGetDnodeId(pDnode)) { + if (dropReq.dnodeId != pDnode->dnodeId) { terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION; dError("failed to drop qnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/qnode/src/qmMsg.c b/source/dnode/mgmt/qnode/src/qmMsg.c index af0747a357..871a4603b4 100644 --- a/source/dnode/mgmt/qnode/src/qmMsg.c +++ b/source/dnode/mgmt/qnode/src/qmMsg.c @@ -17,8 +17,8 @@ #include "qmMsg.h" #include "qmWorker.h" -int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} -int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg){return 0;} +int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;} +int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg){return 0;} void qmInitMsgHandles(SMgmtWrapper *pWrapper) { } diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index a4874dda08..75dd9bcc55 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -39,8 +39,8 @@ int32_t dndInitSnode(SDnode *pDnode); void dndCleanupSnode(SDnode *pDnode); void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/snode/src/smMgmt.c b/source/dnode/mgmt/snode/src/smMgmt.c index 12468085b1..09959bcf09 100644 --- a/source/dnode/mgmt/snode/src/smMgmt.c +++ b/source/dnode/mgmt/snode/src/smMgmt.c @@ -210,9 +210,9 @@ static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { pOption->pDnode = pDnode; pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqToMnodeFp = dndSendReqToMnode; - pOption->sendRedirectRspFp = dmSendRedirectRsp; - pOption->dnodeId = dmGetDnodeId(pDnode); - pOption->clusterId = dmGetClusterId(pDnode); + pOption->sendRedirectRspFp = dndSendRedirectRsp; + pOption->dnodeId = pDnode->dnodeId; + pOption->clusterId = pDnode->clusterId; pOption->sver = tsVersion; } @@ -299,7 +299,7 @@ int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - if (createReq.dnodeId != dmGetDnodeId(pDnode)) { + if (createReq.dnodeId != pDnode->dnodeId) { terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION; dError("failed to create snode since %s", terrstr()); return -1; @@ -315,7 +315,7 @@ int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - if (dropReq.dnodeId != dmGetDnodeId(pDnode)) { + if (dropReq.dnodeId != pDnode->dnodeId) { terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION; dError("failed to drop snode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/snode/src/smMsg.c b/source/dnode/mgmt/snode/src/smMsg.c index 44bc8fe329..859474b60b 100644 --- a/source/dnode/mgmt/snode/src/smMsg.c +++ b/source/dnode/mgmt/snode/src/smMsg.c @@ -17,8 +17,8 @@ #include "smMsg.h" #include "smWorker.h" -int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} -int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} +int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;} +int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;} void smInitMsgHandles(SMgmtWrapper *pWrapper) { } diff --git a/source/dnode/mgmt/test/sut/inc/server.h b/source/dnode/mgmt/test/sut/inc/server.h index 220e8e2090..5f9e4846a7 100644 --- a/source/dnode/mgmt/test/sut/inc/server.h +++ b/source/dnode/mgmt/test/sut/inc/server.h @@ -24,7 +24,7 @@ class TestServer { bool DoStart(); private: - SDndCfg BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp); + SDnodeOpt BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp); private: SDnode* pDnode; diff --git a/source/dnode/mgmt/test/sut/src/server.cpp b/source/dnode/mgmt/test/sut/src/server.cpp index 8318d25150..e8ccc700e2 100644 --- a/source/dnode/mgmt/test/sut/src/server.cpp +++ b/source/dnode/mgmt/test/sut/src/server.cpp @@ -22,22 +22,22 @@ void* serverLoop(void* param) { } } -SDndCfg TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) { - SDndCfg cfg = {0}; - cfg.numOfSupportVnodes = 16; - cfg.serverPort = port; - strcpy(cfg.dataDir, path); - snprintf(cfg.localEp, TSDB_EP_LEN, "%s:%u", fqdn, port); - snprintf(cfg.localFqdn, TSDB_FQDN_LEN, "%s", fqdn); - snprintf(cfg.firstEp, TSDB_EP_LEN, "%s", firstEp); - return cfg; +SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) { + SDnodeOpt option = {0}; + option.numOfSupportVnodes = 16; + option.serverPort = port; + strcpy(option.dataDir, path); + snprintf(option.localEp, TSDB_EP_LEN, "%s:%u", fqdn, port); + snprintf(option.localFqdn, TSDB_FQDN_LEN, "%s", fqdn); + snprintf(option.firstEp, TSDB_EP_LEN, "%s", firstEp); + return option; } bool TestServer::DoStart() { - SDndCfg cfg = BuildOption(path, fqdn, port, firstEp); + SDnodeOpt option = BuildOption(path, fqdn, port, firstEp); taosMkDir(path); - pDnode = dndCreate(&cfg); + pDnode = dndCreate(&option); if (pDnode != NULL) { return false; } diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index a3f1b3ad1f..3378cd4669 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -85,10 +85,10 @@ typedef struct { } SVnodeThread; // interface -void vmGetMgmtFp(SMgmtWrapper *pWrapper); -void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads); -void vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo); -void vmGetVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo); +void vmGetMgmtFp(SMgmtWrapper *pWrapper); +void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads); +int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo); +void vmGetVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo); // vmInt.h SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId); diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index c591a0f4dd..cbddca13a3 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -279,11 +279,11 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { taosInitRWLatch(&pMgmt->latch); SDiskCfg dCfg = {0}; - tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN); + tstrncpy(dCfg.dir, pDnode->dataDir, TSDB_FILENAME_LEN); dCfg.level = 0; dCfg.primary = 1; - SDiskCfg *pDisks = pDnode->cfg.pDisks; - int32_t numOfDisks = pDnode->cfg.numOfDisks; + SDiskCfg *pDisks = pDnode->pDisks; + int32_t numOfDisks = pDnode->numOfDisks; if (numOfDisks <= 0 || pDisks == NULL) { pDisks = &dCfg; numOfDisks = 1; @@ -342,11 +342,11 @@ void vmGetMgmtFp(SMgmtWrapper *pWrapper) { pWrapper->fp = mgmtFp; } -void vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo) { +int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; + if (pMgmt == NULL) return -1; - tfsGetMonitorInfo(pMgmt->pTfs, pInfo); + return tfsGetMonitorInfo(pMgmt->pTfs, pInfo); } void vmGetVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo) { diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 956e55557c..4888541f40 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -70,7 +70,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) { SWrapperCfg wrapperCfg = {0}; vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg); - if (createReq.dnodeId != dmGetDnodeId(pMgmt->pDnode)) { + if (createReq.dnodeId != pMgmt->pDnode->dnodeId) { terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION; dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 7b0f25d043..7b4a0233e2 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -119,11 +119,10 @@ typedef struct SMnode { SHashObj *infosMeta; SGrantInfo grant; MndMsgFp msgFp[TDMT_MAX]; - SendReqToDnodeFp sendReqFp; - SendReqToMnodeFp sendReqToMnodeFp; - SendRedirectRspFp sendRedirectRspFp; - PutReqToMWriteQFp putReqToMWriteQFp; - PutReqToMReadQFp putReqToMReadQFp; + SendReqFp sendReqFp; + SendMnodeReqFp sendReqToMnodeFp; + PutToQueueFp putReqToMWriteQFp; + PutToQueueFp putReqToMReadQFp; } SMnode; int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index d5a629a5b0..c0d7e4ad35 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -291,9 +291,8 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp; pMnode->sendReqFp = pOption->sendReqFp; pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp; - pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp; - if (pMnode->sendReqFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->sendRedirectRspFp == NULL || + if (pMnode->sendReqFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { terrno = TSDB_CODE_MND_INVALID_OPTIONS; return -1; diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index cf1c5cf227..2c11cfcf32 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -389,7 +389,7 @@ static int32_t tfsMount(STfs *pTfs, SDiskCfg *pCfg) { } static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) { - char dirName[TSDB_FILENAME_LEN] = "\0"; + char dirName[TSDB_FILENAME_LEN] = "\0"; if (pCfg->level < 0 || pCfg->level >= TFS_MAX_TIERS) { fError("failed to mount %s to FS since invalid level %d", pCfg->dir, pCfg->level); @@ -539,9 +539,9 @@ static STfsDisk *tfsNextDisk(STfs *pTfs, SDiskIter *pIter) { return pDisk; } -void tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) { +int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) { pInfo->datadirs = taosArrayInit(32, sizeof(SMonDiskDesc)); - if (pInfo->datadirs == NULL) return; + if (pInfo->datadirs == NULL) return -1; tfsUpdateSize(pTfs); @@ -558,4 +558,6 @@ void tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) { } } tfsUnLock(pTfs); + + return 0; }