From e7ed5e3dd5411317b614566a070800f1c8a71743 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 14 Mar 2022 18:14:27 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/container/inc/dndInt.h | 32 +---- .../dnode/mgmt/container/inc/dndTransport.h | 5 +- source/dnode/mgmt/container/src/dndMonitor.c | 3 +- source/dnode/mgmt/container/src/dndNode.c | 120 ++++++++---------- .../dnode/mgmt/container/src/dndTransport.c | 37 +----- source/dnode/mgmt/dnode/inc/dmFile.h | 18 +-- source/dnode/mgmt/dnode/inc/dmHandle.h | 8 +- source/dnode/mgmt/dnode/inc/dmInt.h | 22 ++++ source/dnode/mgmt/dnode/inc/dmMgmt.h | 8 +- source/dnode/mgmt/dnode/src/dmFile.c | 73 +++++------ source/dnode/mgmt/dnode/src/dmMgmt.c | 5 +- source/dnode/mgmt/dnode/src/dmWorker.c | 6 +- 12 files changed, 140 insertions(+), 197 deletions(-) diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 5193c8bb3b..b45827fce6 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -61,18 +61,17 @@ typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStat; +typedef struct SDnodeMgmt SDnodeMgmt; + typedef struct SMgmtFp SMgmtFp; typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SMsgHandle SMsgHandle; + typedef void (*RpcMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEps); typedef void (*NodeMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); - - -typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMsg); typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper); -typedef void (*CloseNodeFp)(SDnode *pDnode, SMgmtWrapper *pWrapper); +typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper); typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper); -typedef int32_t (*MgmtHandleMsgFp)(SMgmtWrapper *pNode, SNodeMsg *pMsg); typedef SMsgHandle (*GetMsgHandleFp)(SMgmtWrapper *pWrapper, int32_t msgIndex); typedef struct SMsgHandle { @@ -95,25 +94,6 @@ typedef struct { }; } SDnodeWorker; -typedef struct { - int32_t dnodeId; - int32_t dropped; - int64_t clusterId; - int64_t dver; - int64_t rebootTime; - int64_t updateTime; - int8_t statusSent; - SEpSet mnodeEpSet; - SHashObj *dnodeHash; - SArray *pDnodeEps; - pthread_t *threadId; - SRWLatch latch; - SDnodeWorker mgmtWorker; - SDnodeWorker statusWorker; - - // - SMsgHandle msgHandles[TDMT_MAX]; -} SDnodeMgmt; typedef struct { int32_t refCount; @@ -223,8 +203,8 @@ typedef struct SMgmtWrapper { EProcType procType; SProcObj *pProc; void *pMgmt; - SMgmtFp fp; SDnode *pDnode; + SMgmtFp fp; } SMgmtWrapper; typedef struct SDnode { @@ -234,12 +214,10 @@ typedef struct SDnode { SDndCfg cfg; SStartupReq startup; TdFilePtr pLockFile; - SDnodeMgmt dmgmt; STransMgmt tmgmt; STfs *pTfs; SMgmtFp fps[NODE_MAX]; SMgmtWrapper wrappers[NODE_MAX]; - char *path; } SDnode; EDndStatus dndGetStatus(SDnode *pDnode); diff --git a/source/dnode/mgmt/container/inc/dndTransport.h b/source/dnode/mgmt/container/inc/dndTransport.h index 1a925ee923..892dea6eb2 100644 --- a/source/dnode/mgmt/container/inc/dndTransport.h +++ b/source/dnode/mgmt/container/inc/dndTransport.h @@ -22,10 +22,11 @@ extern "C" { #endif -int32_t dndInitTrans(SDnode *pDnode); -void dndCleanupTrans(SDnode *pDnode); int32_t dndInitServer(SDnode *pDnode); +void dndCleanupServer(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode); +void dndCleanupClient(SDnode *pDnode); +int32_t dndSetMsgHandle(SDnode *pDnode); int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/container/src/dndMonitor.c b/source/dnode/mgmt/container/src/dndMonitor.c index 41665df6f6..6a0bc33936 100644 --- a/source/dnode/mgmt/container/src/dndMonitor.c +++ b/source/dnode/mgmt/container/src/dndMonitor.c @@ -37,6 +37,8 @@ static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { } static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { + +#if 0 pInfo->uptime = (taosGetTimestampMs() - pDnode->dmgmt.rebootTime) / (86400000.0f); taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system); pInfo->cpu_cores = tsNumOfCores; @@ -49,7 +51,6 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { taosGetCardInfo(&pInfo->net_in, &pInfo->net_out); taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); -#if 0 SVnodesStat *pStat = &pDnode->vmgmt.stat; pInfo->req_select = pStat->numOfSelectReqs; pInfo->req_insert = pStat->numOfInsertReqs; diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index a9bd696c6d..e80bd54f94 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -15,10 +15,10 @@ #define _DEFAULT_SOURCE #include "dndNode.h" -#include "dmMgmt.h" #include "dndTransport.h" #include "bmInt.h" +#include "dmInt.h" #include "mmInt.h" #include "qmInt.h" #include "smInt.h" @@ -43,6 +43,10 @@ static bool dndRequireNode(SMgmtWrapper *pMgmt) { return required; } +static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); } + +static void dndCloseNode(SMgmtWrapper *pWrapper) { (*pWrapper->fp.closeFp)(pWrapper); } + static void dndClearMemory(SDnode *pDnode) { for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; @@ -51,26 +55,12 @@ static void dndClearMemory(SDnode *pDnode) { if (pDnode->pLockFile != NULL) { taosUnLockFile(pDnode->pLockFile); taosCloseFile(&pDnode->pLockFile); + pDnode->pLockFile = NULL; } - tfree(pDnode->path); + tfree(pDnode); dDebug("dnode object memory is cleared, data:%p", pDnode); } -static int32_t dndInitResource(SDnode *pDnode) { - - - - return 0; -} - -static void dndClearResource(SDnode *pDnode) { - dndCleanupTrans(pDnode); - dndStopMgmt(pDnode); - dndCleanupMgmt(pDnode); - tfsClose(pDnode->pTfs); - dDebug("dnode object resource is cleared, data:%p", pDnode); -} - SDnode *dndCreate(SDndCfg *pCfg) { dInfo("start to create dnode object"); int32_t code = -1; @@ -84,12 +74,18 @@ SDnode *dndCreate(SDndCfg *pCfg) { } dndSetStatus(pDnode, DND_STAT_INIT); + pDnode->pLockFile = dndCheckRunning(pCfg->dataDir); + if (pDnode->pLockFile == NULL) { + goto _OVER; + } - snprintf(path, sizeof(path), "%s%sdnode", pCfg->dataDir, TD_DIRSEP); - pDnode->path = strdup(path); - if (taosMkDir(path) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to create dir:%s since %s", path, terrstr()); + if (dndInitServer(pDnode) != 0) { + dError("failed to init trans server since %s", terrstr()); + goto _OVER; + } + + if (dndInitClient(pDnode) != 0) { + dError("failed to init trans client since %s", terrstr()); goto _OVER; } @@ -107,10 +103,15 @@ SDnode *dndCreate(SDndCfg *pCfg) { pDnode->wrappers[BNODE].name = "bnode"; memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg)); + if (dndSetMsgHandle(pDnode) != 0) { + goto _OVER; + } + for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; snprintf(path, sizeof(path), "%s%s%s", pCfg->dataDir, TD_DIRSEP, pDnode->wrappers[n].name); pWrapper->path = strdup(path); + pWrapper->pDnode = pDnode; if (pDnode->wrappers[n].path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; @@ -127,17 +128,11 @@ SDnode *dndCreate(SDndCfg *pCfg) { } } - pDnode->pLockFile = dndCheckRunning(pCfg->dataDir); - if (pDnode->pLockFile == NULL) { - goto _OVER; - } - code = 0; _OVER: if (code != 0 && pDnode) { dndClearMemory(pDnode); - tfree(pDnode); dError("failed to create dnode object since %s", terrstr()); } else { dInfo("dnode object is created, data:%p", pDnode); @@ -157,64 +152,53 @@ void dndClose(SDnode *pDnode) { dInfo("start to close dnode, data:%p", pDnode); dndSetStatus(pDnode, DND_STAT_STOPPED); - dndClearResource(pDnode); + dndCleanupServer(pDnode); + dndCleanupClient(pDnode); + + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + dndCloseNode(pWrapper); + } + dndClearMemory(pDnode); - tfree(pDnode); dInfo("dnode object is closed, data:%p", pDnode); } -static int32_t dndOpenNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { - // if (tsMultiProcess) { - // SProcCfg cfg = {0}; - // pWrapper->pProc = taosProcInit(&cfg); - // if (taosProcIsChild(pWrapper->pProc)) { - // pWrapper->procType = PROC_CHILD; - // dInfo("node:%s, will start in child process", pWrapper->name); - // } else { - // pWrapper->procType = PROC_PARENT; - // dInfo("node:%s, will start in parent process", pWrapper->name); - // } - // } else { - // pWrapper->procType = PROC_SINGLE; - // dInfo("node:%s, will start in single process mnode", pWrapper->name); - // } - - // if (pWrapper->procType == PROC_SINGLE || pWrapper->procType == PROC_CHILD) { - // SDndInfo info; - // pWrapper->pNode = (*pWrapper->fp.openFp)(pWrapper->path, &info); - // if (pWrapper != NULL) { - // return -1; - // } - // } - - // return 0; - - (*pWrapper->fp.openFp)(pWrapper); - return 0; -} - -static void dndClearNodeExecpt(SDnode *pDnode, SMgmtWrapper *pWrapper){} - static int32_t dndRunInSingleProcess(SDnode *pDnode) { dInfo("dnode run in single process mode"); + for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + if (!pWrapper->required) continue; + dInfo("node:%s, will start in single process", pWrapper->name); - if (dndOpenNode(pDnode, pWrapper) != 0) { + if (dndOpenNode(pWrapper) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); return -1; } } + return 0; } +static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { + dndCleanupServer(pDnode); + for (ENodeType n = 0; n < NODE_MAX; ++n) { + if (except == n) continue; + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + dndCloseNode(pWrapper); + } +} + static int32_t dndRunInMultiProcess(SDnode *pDnode) { for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + if (!pWrapper->required) continue; + if (n == DNODE) { dInfo("node:%s, will start in parent process", pWrapper->name); pWrapper->procType = PROC_PARENT; - if (dndOpenNode(pDnode, pWrapper) != 0) { + if (dndOpenNode(pWrapper) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); return -1; } @@ -236,17 +220,13 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { dndResetLog(pWrapper); dInfo("node:%s, clean up resources inherited from parent", pWrapper->name); - dndClearNodeExecpt(pDnode, pWrapper); - - dInfo("node:%s, init trans client in child process", pWrapper->name); - dndInitClient(pDnode); + dndClearNodesExecpt(pDnode, n); dInfo("node:%s, will be initialized in child process", pWrapper->name); - dndOpenNode(pDnode, pWrapper); + dndOpenNode(pWrapper); } else { dInfo("node:%s, will not start in parent process", pWrapper->name); pWrapper->procType = PROC_PARENT; - dndOpenNode(pDnode, pWrapper); } } diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index b36f4c7d3f..a9358066d1 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -15,9 +15,8 @@ #define _DEFAULT_SOURCE #include "dndTransport.h" -#include "dmMgmt.h" +#include "dmInt.h" #include "mmInt.h" -#include "dmHandle.h" #define INTERNAL_USER "_dnd" #define INTERNAL_CKEY "_key" @@ -47,7 +46,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { } } - int32_t dndInitClient(SDnode *pDnode) { +int32_t dndInitClient(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->tmgmt; SRpcInit rpcInit; @@ -208,7 +207,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char return rpcRsp.code; } - int32_t dndInitServer(SDnode *pDnode) { +int32_t dndInitServer(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->tmgmt; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); @@ -238,7 +237,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char return 0; } -static void dndCleanupServer(SDnode *pDnode) { +void dndCleanupServer(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->tmgmt; if (pMgmt->serverRpc) { rpcClose(pMgmt->serverRpc); @@ -247,7 +246,7 @@ static void dndCleanupServer(SDnode *pDnode) { } } -static int32_t dndSetMsgHandle(SDnode *pDnode) { +int32_t dndSetMsgHandle(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->tmgmt; for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) { @@ -274,32 +273,6 @@ static int32_t dndSetMsgHandle(SDnode *pDnode) { return 0; } -int32_t dndInitTrans(SDnode *pDnode) { - dInfo("dnode-transport start to init"); - - if (dndSetMsgHandle(pDnode) != 0) { - return -1; - } - - if (dndInitClient(pDnode) != 0) { - return -1; - } - - if (dndInitServer(pDnode) != 0) { - return -1; - } - - dInfo("dnode-transport is initialized"); - return 0; -} - -void dndCleanupTrans(SDnode *pDnode) { - dInfo("dnode-transport start to clean up"); - dndCleanupServer(pDnode); - dndCleanupClient(pDnode); - dInfo("dnode-transport is cleaned up"); -} - int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) { STransMgmt *pMgmt = &pDnode->tmgmt; if (pMgmt->clientRpc == NULL) { diff --git a/source/dnode/mgmt/dnode/inc/dmFile.h b/source/dnode/mgmt/dnode/inc/dmFile.h index 64cd00f9ff..91eb24a364 100644 --- a/source/dnode/mgmt/dnode/inc/dmFile.h +++ b/source/dnode/mgmt/dnode/inc/dmFile.h @@ -13,25 +13,21 @@ * along with this program. If not, see . */ -#ifndef _TD_DND_FILE_H_ -#define _TD_DND_FILE_H_ +#ifndef _TD_DND_DNODE_FILE_H_ +#define _TD_DND_DNODE_FILE_H_ -#include "dndInt.h" +#include "dmInt.h" #ifdef __cplusplus extern "C" { #endif -int32_t dmReadFile(SDnode *pDnode); -int32_t dmWriteFile(SDnode *pDnode); - -void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps); -void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps); -void dndPrintDnodes(SDnode *pDnode); -bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp); +int32_t dmReadFile(SDnodeMgmt *pMgmt); +int32_t dmWriteFile(SDnodeMgmt *pMgmt); +void dndUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps); #ifdef __cplusplus } #endif -#endif /*_TD_DND_FILE_H_*/ \ No newline at end of file +#endif /*_TD_DND_DNODE_FILE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/inc/dmHandle.h b/source/dnode/mgmt/dnode/inc/dmHandle.h index e9ae27ba73..0f97c20c4d 100644 --- a/source/dnode/mgmt/dnode/inc/dmHandle.h +++ b/source/dnode/mgmt/dnode/inc/dmHandle.h @@ -13,10 +13,10 @@ * along with this program. If not, see . */ -#ifndef _TD_DND_HADNLE_H_ -#define _TD_DND_HADNLE_H_ +#ifndef _TD_DND_DNODE_HADNLE_H_ +#define _TD_DND_DNODE_HADNLE_H_ -#include "dndInt.h" +#include "dmInt.h" #ifdef __cplusplus extern "C" { @@ -33,4 +33,4 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); } #endif -#endif /*_TD_DND_HADNLE_H_*/ \ No newline at end of file +#endif /*_TD_DND_DNODE_HADNLE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index af65949eaf..4b67f848cf 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -22,7 +22,29 @@ extern "C" { #endif +typedef struct SDnodeMgmt { + int32_t dnodeId; + int32_t dropped; + int64_t clusterId; + int64_t dver; + int64_t rebootTime; + int64_t updateTime; + int8_t statusSent; + SEpSet mnodeEpSet; + SHashObj *dnodeHash; + SArray *pDnodeEps; + pthread_t *threadId; + SRWLatch latch; + SDnodeWorker mgmtWorker; + SDnodeWorker statusWorker; + SMsgHandle msgHandles[TDMT_MAX]; + const char *path; + SDnode *pDnode; +} SDnodeMgmt; + SMgmtFp dmGetMgmtFp(); +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); +void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/inc/dmMgmt.h b/source/dnode/mgmt/dnode/inc/dmMgmt.h index c8649b932b..389b9af7da 100644 --- a/source/dnode/mgmt/dnode/inc/dmMgmt.h +++ b/source/dnode/mgmt/dnode/inc/dmMgmt.h @@ -13,10 +13,10 @@ * along with this program. If not, see . */ -#ifndef _TD_DND_MGMT_H_ -#define _TD_DND_MGMT_H_ +#ifndef _TD_DND_DNODE_MGMT_H_ +#define _TD_DND_DNODE_MGMT_H_ -#include "dndInt.h" +#include "dmInt.h" #ifdef __cplusplus extern "C" { @@ -38,4 +38,4 @@ void dndProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) ; } #endif -#endif /*_TD_DND_MGMT_H_*/ \ No newline at end of file +#endif /*_TD_DND_DNODE_MGMT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/src/dmFile.c b/source/dnode/mgmt/dnode/src/dmFile.c index 2adcf0bfa8..aeb0cc40aa 100644 --- a/source/dnode/mgmt/dnode/src/dmFile.c +++ b/source/dnode/mgmt/dnode/src/dmFile.c @@ -16,8 +16,18 @@ #define _DEFAULT_SOURCE #include "dmFile.h" -int32_t dmReadFile(SDnode *pDnode) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; +static void dndPrintDnodes(SDnodeMgmt *pMgmt); +static bool dndIsEpChanged(SDnodeMgmt *pMgmt, const char *ep); +static void dndResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps); + +int32_t dmReadFile(SDnodeMgmt *pMgmt) { + int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 256 * 1024; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + char file[PATH_MAX]; + TdFilePtr pFile = NULL; pMgmt->pDnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); if (pMgmt->pDnodeEps == NULL) { @@ -25,16 +35,8 @@ int32_t dmReadFile(SDnode *pDnode) { goto PRASE_DNODE_OVER; } - int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; - int32_t len = 0; - int32_t maxLen = 256 * 1024; - char *content = calloc(1, maxLen + 1); - cJSON *root = NULL; - - char file[PATH_MAX]; - snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->path, TD_DIRSEP); - - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); + snprintf(file, sizeof(file), "%s%sdnode.json", pMgmt->path, TD_DIRSEP); + pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { dDebug("file %s not exist", file); code = 0; @@ -128,36 +130,34 @@ int32_t dmReadFile(SDnode *pDnode) { code = 0; dInfo("succcessed to read file %s", file); - dndPrintDnodes(pDnode); + dndPrintDnodes(pMgmt); PRASE_DNODE_OVER: if (content != NULL) free(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); - if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->cfg.localEp)) { - dError("localEp %s different with %s and need reconfigured", pDnode->cfg.localEp, file); + if (dndIsEpChanged(pMgmt, pMgmt->pDnode->cfg.localEp)) { + dError("localEp %s different with %s and need reconfigured", pMgmt->pDnode->cfg.localEp, file); return -1; } if (taosArrayGetSize(pMgmt->pDnodeEps) == 0) { SDnodeEp dnodeEp = {0}; dnodeEp.isMnode = 1; - taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &dnodeEp.ep); + taosGetFqdnPortFromEp(pMgmt->pDnode->cfg.firstEp, &dnodeEp.ep); taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); } - dndResetDnodes(pDnode, pMgmt->pDnodeEps); + dndResetDnodes(pMgmt, pMgmt->pDnodeEps); terrno = 0; return 0; } -int32_t dmWriteFile(SDnode *pDnode) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - +int32_t dmWriteFile(SDnodeMgmt *pMgmt) { char file[PATH_MAX]; - snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->path, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%sdnode.json.bak", pMgmt->path, TD_DIRSEP); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -197,7 +197,7 @@ int32_t dmWriteFile(SDnode *pDnode) { free(content); char realfile[PATH_MAX]; - snprintf(realfile, sizeof(realfile), "%s%smnode.json", pDnode->path, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; @@ -210,31 +210,28 @@ int32_t dmWriteFile(SDnode *pDnode) { return 0; } -void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps) { +void dndUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { int32_t numOfEps = taosArrayGetSize(pDnodeEps); if (numOfEps <= 0) return; - SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosWLockLatch(&pMgmt->latch); int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); if (numOfEps != numOfEpsOld) { - dndResetDnodes(pDnode, pDnodeEps); - dmWriteFile(pDnode); + dndResetDnodes(pMgmt, pDnodeEps); + dmWriteFile(pMgmt); } else { int32_t size = numOfEps * sizeof(SDnodeEp); if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) { - dndResetDnodes(pDnode, pDnodeEps); - dmWriteFile(pDnode); + dndResetDnodes(pMgmt, pDnodeEps); + dmWriteFile(pMgmt); } } taosWUnLockLatch(&pMgmt->latch); } -void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - +static void dndResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { if (pMgmt->pDnodeEps != pDnodeEps) { SArray *tmp = pMgmt->pDnodeEps; pMgmt->pDnodeEps = taosArrayDup(pDnodeEps); @@ -262,12 +259,10 @@ void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps) { taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); } - dndPrintDnodes(pDnode); + dndPrintDnodes(pMgmt); } -void dndPrintDnodes(SDnode *pDnode) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - +static void dndPrintDnodes(SDnodeMgmt *pMgmt) { int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); dDebug("print dnode ep list, num:%d", numOfEps); for (int32_t i = 0; i < numOfEps; i++) { @@ -276,17 +271,15 @@ void dndPrintDnodes(SDnode *pDnode) { } } -bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) { +static bool dndIsEpChanged(SDnodeMgmt *pMgmt, const char *ep) { bool changed = false; - - SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosRLockLatch(&pMgmt->latch); - SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &pMgmt->dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { char epstr[TSDB_EP_LEN + 1]; snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); - changed = strcmp(pEp, epstr) != 0; + changed = strcmp(ep, epstr) != 0; } taosRUnLockLatch(&pMgmt->latch); diff --git a/source/dnode/mgmt/dnode/src/dmMgmt.c b/source/dnode/mgmt/dnode/src/dmMgmt.c index 51efdb9df6..4f541b8980 100644 --- a/source/dnode/mgmt/dnode/src/dmMgmt.c +++ b/source/dnode/mgmt/dnode/src/dmMgmt.c @@ -296,6 +296,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) { pMgmt->rebootTime = taosGetTimestampMs(); pMgmt->dropped = 0; pMgmt->clusterId = 0; + pMgmt->path = pWrapper->path; taosInitRWLatch(&pMgmt->latch); pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -305,7 +306,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) { return -1; } - if (dmReadFile(pWrapper->pDnode) != 0) { + if (dmReadFile(pMgmt) != 0) { dError("node:%s, failed to read file since %s", pWrapper->name, terrstr()); return -1; } @@ -352,7 +353,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) { #endif } -static void dmCleanup(SDnode *pDnode, SMgmtWrapper *pWrapper){ +static void dmCleanup(SMgmtWrapper *pWrapper){ } diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index b99162830a..981222637f 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -18,10 +18,9 @@ #include "dndWorker.h" #include "dmHandle.h" - static void *dnodeThreadRoutine(void *param) { - SDnode *pDnode = param; - SDnodeMgmt *pMgmt = &pDnode->dmgmt; + SDnodeMgmt *pMgmt = param; + SDnode *pDnode = pMgmt->pDnode; int64_t lastStatusTime = taosGetTimestampMs(); int64_t lastMonitorTime = lastStatusTime; @@ -50,7 +49,6 @@ static void *dnodeThreadRoutine(void *param) { } } - static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { int32_t code = 0;