From a5a4da4d49a4782a62cb6f80f732130f26f0bada Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 14 May 2022 18:16:52 +0800 Subject: [PATCH] refactor: multi process mode --- source/dnode/mgmt/mgmt_bnode/src/bmHandle.c | 4 +- source/dnode/mgmt/mgmt_bnode/src/bmInt.c | 4 +- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 7 +- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 52 ++++---- source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 45 +++---- source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c | 32 ++--- source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 4 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 +- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 16 +-- source/dnode/mgmt/mgmt_qnode/src/qmHandle.c | 2 +- source/dnode/mgmt/mgmt_qnode/src/qmInt.c | 4 +- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 2 +- source/dnode/mgmt/mgmt_snode/src/smInt.c | 4 +- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 12 +- source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 24 ++-- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 76 +++++++----- source/dnode/mgmt/node_mgmt/src/dmRun.c | 21 +--- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 6 +- source/dnode/mgmt/node_util/inc/dmUtil.h | 75 ++++++----- .../{mgmt_dnode => node_util}/src/dmEps.c | 117 +++++++++--------- 20 files changed, 243 insertions(+), 266 deletions(-) rename source/dnode/mgmt/{mgmt_dnode => node_util}/src/dmEps.c (71%) diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c index d87f832262..ab2ca89092 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c @@ -52,9 +52,9 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { return -1; } - if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) { + if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pInput->dnodeId); + dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pInput->pData->dnodeId); return -1; } diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c index 45e0c32193..fbfd0e72e2 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c @@ -34,7 +34,7 @@ static void bmClose(SBnodeMgmt *pMgmt) { dInfo("bnode-mgmt is cleaned up"); } -int32_t bmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { +int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { dInfo("bnode-mgmt start to init"); SBnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SBnodeMgmt)); if (pMgmt == NULL) { @@ -44,7 +44,7 @@ int32_t bmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->path = pInput->path; pMgmt->name = pInput->name; - pMgmt->dnodeId = pInput->dnodeId; + pMgmt->dnodeId = pInput->pData->dnodeId; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.pMgmt = pMgmt; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 276d059579..a01e895913 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -24,6 +24,7 @@ extern "C" { typedef struct SDnodeMgmt { struct SDnode *pDnode; + SDnodeData *pData; SMsgCb msgCb; const char *path; const char *name; @@ -33,14 +34,8 @@ typedef struct SDnodeMgmt { ProcessCreateNodeFp processCreateNodeFp; ProcessDropNodeFp processDropNodeFp; IsNodeRequiredFp isNodeRequiredFp; - SDnodeData data; } SDnodeMgmt; -// dmEps.c -int32_t dmReadEps(SDnodeMgmt *pMgmt); -int32_t dmWriteEps(SDnodeMgmt *pMgmt); -void dmUpdateEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps); - // dmHandle.c SArray *dmGetMsgHandles(); void dmSendStatusReq(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 38f97d3600..ea5587879c 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -17,30 +17,30 @@ #include "dmInt.h" static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { - if (pMgmt->data.dnodeId == 0 || pMgmt->data.clusterId == 0) { + if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) { dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); - taosWLockLatch(&pMgmt->data.latch); - pMgmt->data.dnodeId = pCfg->dnodeId; - pMgmt->data.clusterId = pCfg->clusterId; - dmWriteEps(pMgmt); - taosWUnLockLatch(&pMgmt->data.latch); + taosWLockLatch(&pMgmt->pData->latch); + pMgmt->pData->dnodeId = pCfg->dnodeId; + pMgmt->pData->clusterId = pCfg->clusterId; + dmWriteEps(pMgmt->pData); + taosWUnLockLatch(&pMgmt->pData->latch); } } static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { if (pRsp->code != 0) { - if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->data.dropped && pMgmt->data.dnodeId > 0) { - dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->data.dnodeId); - pMgmt->data.dropped = 1; - dmWriteEps(pMgmt); + if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) { + dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId); + pMgmt->pData->dropped = 1; + dmWriteEps(pMgmt->pData); } } else { SStatusRsp statusRsp = {0}; if (pRsp->pCont != NULL && pRsp->contLen > 0 && tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { - pMgmt->data.dnodeVer = statusRsp.dnodeVer; + pMgmt->pData->dnodeVer = statusRsp.dnodeVer; dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); - dmUpdateEps(pMgmt, statusRsp.pDnodeEps); + dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps); } rpcFreeCont(pRsp->pCont); tFreeSStatusRsp(&statusRsp); @@ -50,17 +50,17 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { void dmSendStatusReq(SDnodeMgmt *pMgmt) { SStatusReq req = {0}; - taosRLockLatch(&pMgmt->data.latch); + taosRLockLatch(&pMgmt->pData->latch); req.sver = tsVersion; - req.dnodeVer = pMgmt->data.dnodeVer; - req.dnodeId = pMgmt->data.dnodeId; - req.clusterId = pMgmt->data.clusterId; + req.dnodeVer = pMgmt->pData->dnodeVer; + req.dnodeId = pMgmt->pData->dnodeId; + req.clusterId = pMgmt->pData->clusterId; if (req.clusterId == 0) req.dnodeId = 0; - req.rebootTime = pMgmt->data.rebootTime; - req.updateTime = pMgmt->data.updateTime; + req.rebootTime = pMgmt->pData->rebootTime; + req.updateTime = pMgmt->pData->updateTime; req.numOfCores = tsNumOfCores; - req.numOfSupportVnodes = pMgmt->data.supportVnodes; - tstrncpy(req.dnodeEp, pMgmt->data.localEp, TSDB_EP_LEN); + req.numOfSupportVnodes = pMgmt->pData->supportVnodes; + tstrncpy(req.dnodeEp, pMgmt->pData->localEp, TSDB_EP_LEN); req.clusterCfg.statusInterval = tsStatusInterval; req.clusterCfg.checkTime = 0; @@ -69,24 +69,24 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN); memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); - taosRUnLockLatch(&pMgmt->data.latch); + taosRUnLockLatch(&pMgmt->pData->latch); SMonVloadInfo vinfo = {0}; dmGetVnodeLoads(pMgmt, &vinfo); req.pVloads = vinfo.pVloads; - pMgmt->data.unsyncedVgId = 0; - pMgmt->data.vndState = TAOS_SYNC_STATE_LEADER; + pMgmt->pData->unsyncedVgId = 0; + pMgmt->pData->vndState = TAOS_SYNC_STATE_LEADER; for (int32_t i = 0; i < taosArrayGetSize(req.pVloads); ++i) { SVnodeLoad *pLoad = taosArrayGet(req.pVloads, i); if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) { - pMgmt->data.unsyncedVgId = pLoad->vgId; - pMgmt->data.vndState = pLoad->syncState; + pMgmt->pData->unsyncedVgId = pLoad->vgId; + pMgmt->pData->vndState = pLoad->syncState; } } SMonMloadInfo minfo = {0}; dmGetMnodeLoads(pMgmt, &minfo); - pMgmt->data.mndState = minfo.load.syncState; + pMgmt->pData->mndState = minfo.load.syncState; int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index cc623f7d85..4c1ed1e9ad 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -27,12 +27,12 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { } static void dmStopMgmt(SDnodeMgmt *pMgmt) { - pMgmt->data.stopped = true; + pMgmt->pData->stopped = true; dmStopMonitorThread(pMgmt); dmStopStatusThread(pMgmt); } -static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { +static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { dInfo("dnode-mgmt start to init"); SDnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeMgmt)); if (pMgmt == NULL) { @@ -40,18 +40,6 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) return -1; } - pMgmt->data.dnodeId = 0; - pMgmt->data.clusterId = 0; - pMgmt->data.dnodeVer = 0; - pMgmt->data.updateTime = 0; - pMgmt->data.rebootTime = taosGetTimestampMs(); - pMgmt->data.dropped = 0; - pMgmt->data.localEp = pInput->localEp; - pMgmt->data.localFqdn = pInput->localFqdn; - pMgmt->data.firstEp = pInput->firstEp; - pMgmt->data.secondEp = pInput->secondEp; - pMgmt->data.supportVnodes = pInput->supportVnodes; - pMgmt->data.serverPort = pInput->serverPort; pMgmt->pDnode = pInput->pDnode; pMgmt->msgCb = pInput->msgCb; pMgmt->path = pInput->path; @@ -59,21 +47,21 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) pMgmt->processCreateNodeFp = pInput->processCreateNodeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp; - taosInitRWLatch(&pMgmt->data.latch); + taosInitRWLatch(&pMgmt->pData->latch); - pMgmt->data.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - if (pMgmt->data.dnodeHash == NULL) { + pMgmt->pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + if (pMgmt->pData->dnodeHash == NULL) { dError("failed to init dnode hash"); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (dmReadEps(pMgmt) != 0) { + if (dmReadEps(pMgmt->pData) != 0) { dError("failed to read file since %s", terrstr()); return -1; } - if (pMgmt->data.dropped) { + if (pMgmt->pData->dropped) { dError("dnode will not start since its already dropped"); return -1; } @@ -82,12 +70,11 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) return -1; } - if (udfStartUdfd(pMgmt->data.dnodeId) != 0) { + if (udfStartUdfd(pMgmt->pData->dnodeId) != 0) { dError("failed to start udfd"); } pOutput->pMgmt = pMgmt; - pOutput->mnodeEps = pMgmt->data.mnodeEps; dInfo("dnode-mgmt is initialized"); return 0; } @@ -96,16 +83,16 @@ static void dmCloseMgmt(SDnodeMgmt *pMgmt) { dInfo("dnode-mgmt start to clean up"); dmStopWorker(pMgmt); - taosWLockLatch(&pMgmt->data.latch); - if (pMgmt->data.dnodeEps != NULL) { - taosArrayDestroy(pMgmt->data.dnodeEps); - pMgmt->data.dnodeEps = NULL; + taosWLockLatch(&pMgmt->pData->latch); + if (pMgmt->pData->dnodeEps != NULL) { + taosArrayDestroy(pMgmt->pData->dnodeEps); + pMgmt->pData->dnodeEps = NULL; } - if (pMgmt->data.dnodeHash != NULL) { - taosHashCleanup(pMgmt->data.dnodeHash); - pMgmt->data.dnodeHash = NULL; + if (pMgmt->pData->dnodeHash != NULL) { + taosHashCleanup(pMgmt->pData->dnodeHash); + pMgmt->pData->dnodeHash = NULL; } - taosWUnLockLatch(&pMgmt->data.latch); + taosWUnLockLatch(&pMgmt->pData->latch); taosMemoryFree(pMgmt); dInfo("dnode-mgmt is cleaned up"); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c b/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c index ce7d722834..801ec89ac2 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmMonitor.c @@ -16,30 +16,30 @@ #define _DEFAULT_SOURCE #include "dmInt.h" -#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \ - if (!tsMultiProcess) { \ - SRpcMsg rsp = {0}; \ - SRpcMsg req = {.msgType = mtype}; \ - SEpSet epset = {.inUse = 0, .numOfEps = 1}; \ - tstrncpy(epset.eps[0].fqdn, pMgmt->data.localFqdn, TSDB_FQDN_LEN); \ - epset.eps[0].port = pMgmt->data.serverPort; \ - \ - rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \ - if (rsp.code == 0 && rsp.contLen > 0) { \ - func(rsp.pCont, rsp.contLen, pInfo); \ - } \ - rpcFreeCont(rsp.pCont); \ +#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \ + if (!tsMultiProcess) { \ + SRpcMsg rsp = {0}; \ + SRpcMsg req = {.msgType = mtype}; \ + SEpSet epset = {.inUse = 0, .numOfEps = 1}; \ + tstrncpy(epset.eps[0].fqdn, pMgmt->pData->localFqdn, TSDB_FQDN_LEN); \ + epset.eps[0].port = pMgmt->pData->serverPort; \ + \ + rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \ + if (rsp.code == 0 && rsp.contLen > 0) { \ + func(rsp.pCont, rsp.contLen, pInfo); \ + } \ + rpcFreeCont(rsp.pCont); \ } static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) { pInfo->protocol = 1; - pInfo->dnode_id = pMgmt->data.dnodeId; - pInfo->cluster_id = pMgmt->data.clusterId; + pInfo->dnode_id = pMgmt->pData->dnodeId; + pInfo->cluster_id = pMgmt->pData->clusterId; tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN); } static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) { - pInfo->uptime = (taosGetTimestampMs() - pMgmt->data.rebootTime) / (86400000.0f); + pInfo->uptime = (taosGetTimestampMs() - pMgmt->pData->rebootTime) / (86400000.0f); pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, MNODE); pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, QNODE); pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(pMgmt->pDnode, SNODE); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index af02e56f05..7f36beea0d 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -24,7 +24,7 @@ static void *dmStatusThreadFp(void *param) { while (1) { taosMsleep(200); - if (pMgmt->data.dropped || pMgmt->data.stopped) break; + if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; int64_t curTime = taosGetTimestampMs(); float interval = (curTime - lastTime) / 1000.0f; @@ -45,7 +45,7 @@ static void *dmMonitorThreadFp(void *param) { while (1) { taosMsleep(200); - if (pMgmt->data.dropped || pMgmt->data.stopped) break; + if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; int64_t curTime = taosGetTimestampMs(); float interval = (curTime - lastTime) / 1000.0f; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 5548a23c55..0629dc9123 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -81,7 +81,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { return -1; } - if (createReq.replica <= 1 || (createReq.dnodeId != pInput->dnodeId && pInput->dnodeId != 0)) { + if (createReq.replica <= 1 || (createReq.dnodeId != pInput->pData->dnodeId && pInput->pData->dnodeId != 0)) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create mnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 8815af2f22..4fa49fcf23 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -18,9 +18,9 @@ #include "wal.h" static bool mmDeployRequired(const SMgmtInputOpt *pInput) { - if (pInput->dnodeId > 0) return false; - if (pInput->clusterId > 0) return false; - if (strcmp(pInput->localEp, pInput->firstEp) != 0) return false; + if (pInput->pData->dnodeId > 0) return false; + if (pInput->pData->clusterId > 0) return false; + if (strcmp(pInput->pData->localEp, pInput->pData->firstEp) != 0) return false; return true; } @@ -44,8 +44,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu pOption->selfIndex = 0; SReplica *pReplica = &pOption->replicas[0]; pReplica->id = 1; - pReplica->port = pInput->serverPort; - tstrncpy(pReplica->fqdn, pInput->localFqdn, TSDB_FQDN_LEN); + pReplica->port = pInput->pData->serverPort; + tstrncpy(pReplica->fqdn, pInput->pData->localFqdn, TSDB_FQDN_LEN); pOption->deploy = true; pMgmt->selfIndex = pOption->selfIndex; @@ -118,7 +118,7 @@ static void mmClose(SMnodeMgmt *pMgmt) { dInfo("mnode-mgmt is cleaned up"); } -static int32_t mmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { +static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { dInfo("mnode-mgmt start to init"); if (walInit() != 0) { dError("failed to init wal since %s", terrstr()); @@ -133,7 +133,7 @@ static int32_t mmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->path = pInput->path; pMgmt->name = pInput->name; - pMgmt->dnodeId = pInput->dnodeId; + pMgmt->dnodeId = pInput->pData->dnodeId; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; @@ -181,7 +181,7 @@ static int32_t mmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } } - pOutput->dnodeId = pMgmt->dnodeId; + pInput->pData->dnodeId = pMgmt->dnodeId; pOutput->pMgmt = pMgmt; dInfo("mnode-mgmt is initialized"); return 0; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index c500176b15..06649b835e 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -52,7 +52,7 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { return -1; } - if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) { + if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create qnode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c index 5e86a30732..e215cebd45 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c @@ -34,7 +34,7 @@ static void qmClose(SQnodeMgmt *pMgmt) { dInfo("qnode-mgmt is cleaned up"); } -static int32_t qmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { +static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { dInfo("qnode-mgmt start to init"); SQnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SQnodeMgmt)); if (pMgmt == NULL) { @@ -44,7 +44,7 @@ static int32_t qmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->path = pInput->path; pMgmt->name = pInput->name; - pMgmt->dnodeId = pInput->dnodeId; + pMgmt->dnodeId = pInput->pData->dnodeId; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index b0f0b73cbc..d43129b271 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -52,7 +52,7 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { return -1; } - if (pInput->dnodeId != 0 && createReq.dnodeId != pInput->dnodeId) { + if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; dError("failed to create snode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 5b9f2fc7f7..620749e2a3 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -35,7 +35,7 @@ static void smClose(SSnodeMgmt *pMgmt) { dInfo("snode-mgmt is cleaned up"); } -int32_t smOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { +int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { dInfo("snode-mgmt start to init"); SSnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SSnodeMgmt)); if (pMgmt == NULL) { @@ -45,7 +45,7 @@ int32_t smOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->path = pInput->path; pMgmt->name = pInput->name; - pMgmt->dnodeId = pInput->dnodeId; + pMgmt->dnodeId = pInput->pData->dnodeId; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.pMgmt = pMgmt; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 635467559e..7851521092 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -244,7 +244,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { dInfo("vnode-mgmt is cleaned up"); } -static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { +static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { dInfo("vnode-mgmt start to init"); int32_t code = -1; @@ -253,7 +253,7 @@ static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->path = pInput->path; pMgmt->name = pInput->name; - pMgmt->dnodeId = pInput->dnodeId; + pMgmt->dnodeId = pInput->pData->dnodeId; pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue; @@ -266,11 +266,11 @@ static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { taosInitRWLatch(&pMgmt->latch); SDiskCfg dCfg = {0}; - tstrncpy(dCfg.dir, pInput->dataDir, TSDB_FILENAME_LEN); + tstrncpy(dCfg.dir, pInput->pData->dataDir, TSDB_FILENAME_LEN); dCfg.level = 0; dCfg.primary = 1; - SDiskCfg *pDisks = pInput->disks; - int32_t numOfDisks = pInput->numOfDisks; + SDiskCfg *pDisks = pInput->pData->disks; + int32_t numOfDisks = pInput->pData->numOfDisks; if (numOfDisks <= 0 || pDisks == NULL) { pDisks = &dCfg; numOfDisks = 1; @@ -333,7 +333,7 @@ _OVER: } static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) { - *required = pInput->supportVnodes > 0; + *required = pInput->pData->supportVnodes > 0; return 0; } diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 691741c2a4..235bb7c16c 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -128,7 +128,7 @@ typedef struct SDnode { SRWLatch latch; SEpSet mnodeEps; TdFilePtr lockfile; - SMgmtInputOpt input; + SDnodeData data; SMgmtWrapper wrappers[NODE_END]; } SDnode; @@ -140,16 +140,14 @@ void dmCloseNode(SMgmtWrapper *pWrapper); SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType); int32_t dmMarkWrapper(SMgmtWrapper *pWrapper); void dmReleaseWrapper(SMgmtWrapper *pWrapper); +SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper); void dmSetStatus(SDnode *pDnode, EDndRunStatus stype); void dmSetEvent(SDnode *pDnode, EDndEvent event); void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc); - -void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); -void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); -int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); -int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); +void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); +void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); // dmProc.c int32_t dmInitProc(struct SMgmtWrapper *pWrapper); @@ -164,13 +162,13 @@ void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const EProcFuncType ftype); // dmTransport.c -int32_t dmInitServer(SDnode *pDnode); -void dmCleanupServer(SDnode *pDnode); -int32_t dmInitClient(SDnode *pDnode); -void dmCleanupClient(SDnode *pDnode); -SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper); -int32_t dmInitMsgHandle(SDnode *pDnode); -int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t dmInitServer(SDnode *pDnode); +void dmCleanupServer(SDnode *pDnode); +int32_t dmInitClient(SDnode *pDnode); +void dmCleanupClient(SDnode *pDnode); +SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper); +int32_t dmInitMsgHandle(SDnode *pDnode); +int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); // mgmt nodes SMgmtFunc dmGetMgmtFunc(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 7ecf1a2d44..cc45173324 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -19,12 +19,10 @@ static bool dmIsNodeRequired(SDnode *pDnode, EDndNodeType ntype) { return pDnode->wrappers[ntype].required; } static bool dmRequireNode(SMgmtWrapper *pWrapper) { - SMgmtInputOpt *pInput = &pWrapper->pDnode->input; - pInput->name = pWrapper->name; - pInput->path = pWrapper->path; + SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); bool required = false; - int32_t code = (*pWrapper->func.requiredFp)(pInput, &required); + int32_t code = (*pWrapper->func.requiredFp)(&input, &required); if (!required) { dDebug("node:%s, does not require startup", pWrapper->name); } @@ -55,24 +53,24 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { dInfo("dnode will run in child-process mode, node:%s", pWrapper->name); } - pDnode->input.dnodeId = 0; - pDnode->input.clusterId = 0; - pDnode->input.localEp = strdup(pOption->localEp); - pDnode->input.localFqdn = strdup(pOption->localFqdn); - pDnode->input.firstEp = strdup(pOption->firstEp); - pDnode->input.secondEp = strdup(pOption->secondEp); - pDnode->input.serverPort = pOption->serverPort; - pDnode->input.supportVnodes = pOption->numOfSupportVnodes; - pDnode->input.numOfDisks = pOption->numOfDisks; - pDnode->input.disks = pOption->disks; - pDnode->input.dataDir = strdup(pOption->dataDir); - pDnode->input.pDnode = pDnode; - pDnode->input.processCreateNodeFp = dmProcessCreateNodeReq; - pDnode->input.processDropNodeFp = dmProcessDropNodeReq; - pDnode->input.isNodeRequiredFp = dmIsNodeRequired; + pDnode->data.dnodeId = 0; + pDnode->data.clusterId = 0; + pDnode->data.dnodeVer = 0; + pDnode->data.updateTime = 0; + pDnode->data.rebootTime = taosGetTimestampMs(); + pDnode->data.dropped = 0; + pDnode->data.localEp = strdup(pOption->localEp); + pDnode->data.localFqdn = strdup(pOption->localFqdn); + pDnode->data.firstEp = strdup(pOption->firstEp); + pDnode->data.secondEp = strdup(pOption->secondEp); + pDnode->data.serverPort = pOption->serverPort; + pDnode->data.supportVnodes = pOption->numOfSupportVnodes; + pDnode->data.numOfDisks = pOption->numOfDisks; + pDnode->data.disks = pOption->disks; + pDnode->data.dataDir = strdup(pOption->dataDir); - if (pDnode->input.dataDir == NULL || pDnode->input.localEp == NULL || pDnode->input.localFqdn == NULL || - pDnode->input.firstEp == NULL || pDnode->input.secondEp == NULL) { + if (pDnode->data.dataDir == NULL || pDnode->data.localEp == NULL || pDnode->data.localFqdn == NULL || + pDnode->data.firstEp == NULL || pDnode->data.secondEp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -92,11 +90,11 @@ static void dmClearVars(SDnode *pDnode) { pDnode->lockfile = NULL; } - taosMemoryFreeClear(pDnode->input.localEp); - taosMemoryFreeClear(pDnode->input.localFqdn); - taosMemoryFreeClear(pDnode->input.firstEp); - taosMemoryFreeClear(pDnode->input.secondEp); - taosMemoryFreeClear(pDnode->input.dataDir); + taosMemoryFreeClear(pDnode->data.localEp); + taosMemoryFreeClear(pDnode->data.localFqdn); + taosMemoryFreeClear(pDnode->data.firstEp); + taosMemoryFreeClear(pDnode->data.secondEp); + taosMemoryFreeClear(pDnode->data.dataDir); taosThreadMutexDestroy(&pDnode->mutex); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); @@ -322,7 +320,7 @@ _OVER: rpcFreeCont(pReq->pCont); } -int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) { +static int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); if (pWrapper != NULL) { dmReleaseWrapper(pWrapper); @@ -340,12 +338,9 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs return -1; } - SMgmtInputOpt *pInput = &pWrapper->pDnode->input; - pInput->name = pWrapper->name; - pInput->path = pWrapper->path; - pInput->msgCb = dmGetMsgcb(pWrapper); + SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); - int32_t code = (*pWrapper->func.createFp)(pInput, pMsg); + int32_t code = (*pWrapper->func.createFp)(&input, pMsg); if (code != 0) { dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); } else { @@ -360,7 +355,7 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs return code; } -int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) { +static int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype); if (pWrapper == NULL) { terrno = TSDB_CODE_NODE_NOT_DEPLOYED; @@ -387,4 +382,19 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) } taosThreadMutexUnlock(&pDnode->mutex); return code; +} + +SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { + SMgmtInputOpt opt = { + .pDnode = pWrapper->pDnode, + .pData = &pWrapper->pDnode->data, + .processCreateNodeFp = dmProcessCreateNodeReq, + .processDropNodeFp = dmProcessDropNodeReq, + .isNodeRequiredFp = dmIsNodeRequired, + .name = pWrapper->name, + .path = pWrapper->path, + }; + + opt.msgCb = dmGetMsgcb(pWrapper); + return opt; } \ No newline at end of file diff --git a/source/dnode/mgmt/node_mgmt/src/dmRun.c b/source/dnode/mgmt/node_mgmt/src/dmRun.c index bc984ce82c..e4b709ba54 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmRun.c +++ b/source/dnode/mgmt/node_mgmt/src/dmRun.c @@ -92,17 +92,14 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { } SMgmtOutputOpt output = {0}; - SMgmtInputOpt *pInput = &pWrapper->pDnode->input; - pInput->name = pWrapper->name; - pInput->path = pWrapper->path; - pInput->msgCb = dmGetMsgcb(pWrapper); + SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); if (pWrapper->ntype == DNODE || OnlyInChildProc(pWrapper->proc.ptype)) { - tmsgSetDefaultMsgCb(&pInput->msgCb); + tmsgSetDefaultMsgCb(&input.msgCb); } if (OnlyInSingleProc(pWrapper->proc.ptype)) { - if ((*pWrapper->func.openFp)(pInput, &output) != 0) { + if ((*pWrapper->func.openFp)(&input, &output) != 0) { dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); return -1; } @@ -111,7 +108,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { } if (InChildProc(pWrapper->proc.ptype)) { - if ((*pWrapper->func.openFp)(pInput, &output) != 0) { + if ((*pWrapper->func.openFp)(&input, &output) != 0) { dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); return -1; } @@ -138,15 +135,9 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { dDebug("node:%s, has been opened in parent process", pWrapper->name); } - if (output.dnodeId != 0) { - pInput->dnodeId = output.dnodeId; - } if (output.pMgmt != NULL) { pWrapper->pMgmt = output.pMgmt; } - if (output.mnodeEps.numOfEps != 0) { - pWrapper->pDnode->mnodeEps = output.mnodeEps; - } dmReportStartup(pWrapper->pDnode, pWrapper->name, "openned"); return 0; @@ -254,8 +245,8 @@ static void dmWatchNodes(SDnode *pDnode) { taosThreadMutexLock(&pDnode->mutex); for (EDndNodeType ntype = DNODE + 1; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SProc *proc = &pWrapper->proc; - + SProc *proc = &pWrapper->proc; + if (!pWrapper->required) continue; if (!InParentProc(proc->ptype)) continue; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 9926c0fd8a..37ac3c89a1 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -240,7 +240,7 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); - if (strcmp(epSet.eps[i].fqdn, pDnode->input.localFqdn) == 0 && epSet.eps[i].port == pDnode->input.serverPort) { + if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) { epSet.inUse = (i + 1) % epSet.numOfEps; } @@ -453,8 +453,8 @@ int32_t dmInitServer(SDnode *pDnode) { SRpcInit rpcInit = {0}; - strncpy(rpcInit.localFqdn, pDnode->input.localFqdn, strlen(pDnode->input.localFqdn)); - rpcInit.localPort = pDnode->input.serverPort; + strncpy(rpcInit.localFqdn, pDnode->data.localFqdn, strlen(pDnode->data.localFqdn)); + rpcInit.localPort = pDnode->data.serverPort; rpcInit.label = "DND"; rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.cfp = (RpcCfp)dmProcessMsg; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index d76ed59897..2d997b3ad7 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -84,35 +84,50 @@ typedef int32_t (*ProcessCreateNodeFp)(struct SDnode *pDnode, EDndNodeType ntype typedef int32_t (*ProcessDropNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); typedef bool (*IsNodeRequiredFp)(struct SDnode *pDnode, EDndNodeType ntype); +typedef struct { + int32_t dnodeId; + int64_t clusterId; + int64_t dnodeVer; + int64_t updateTime; + int64_t rebootTime; + int32_t unsyncedVgId; + ESyncState vndState; + ESyncState mndState; + bool dropped; + bool stopped; + SEpSet mnodeEps; + SArray *dnodeEps; + SHashObj *dnodeHash; + SRWLatch latch; + SMsgCb msgCb; + const char *localEp; + const char *localFqdn; + const char *firstEp; + const char *secondEp; + int32_t supportVnodes; + uint16_t serverPort; + int32_t numOfDisks; + SDiskCfg *disks; + const char *dataDir; +} SDnodeData; + typedef struct { const char *path; const char *name; - SMsgCb msgCb; - int32_t dnodeId; - int64_t clusterId; - const char *localEp; - const char *firstEp; - const char *secondEp; - const char *localFqdn; - uint16_t serverPort; - int32_t supportVnodes; - int32_t numOfDisks; - SDiskCfg *disks; - const char *dataDir; struct SDnode *pDnode; + SDnodeData *pData; + SMsgCb msgCb; ProcessCreateNodeFp processCreateNodeFp; ProcessDropNodeFp processDropNodeFp; IsNodeRequiredFp isNodeRequiredFp; } SMgmtInputOpt; typedef struct { - int32_t dnodeId; - void *pMgmt; - SEpSet mnodeEps; + void *pMgmt; } SMgmtOutputOpt; typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg); -typedef int32_t (*NodeOpenFp)(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput); +typedef int32_t (*NodeOpenFp)(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput); typedef void (*NodeCloseFp)(void *pMgmt); typedef int32_t (*NodeStartFp)(void *pMgmt); typedef void (*NodeStopFp)(void *pMgmt); @@ -155,30 +170,10 @@ TdFilePtr dmCheckRunning(const char *dataDir); int32_t dmReadShmFile(const char *path, const char *name, EDndNodeType runType, SShm *pShm); int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm); -// common define -typedef struct { - int32_t dnodeId; - int64_t clusterId; - int64_t dnodeVer; - int64_t updateTime; - int64_t rebootTime; - int32_t unsyncedVgId; - ESyncState vndState; - ESyncState mndState; - bool dropped; - bool stopped; - SEpSet mnodeEps; - SArray *dnodeEps; - SHashObj *dnodeHash; - SRWLatch latch; - SMsgCb msgCb; - const char *localEp; - const char *localFqdn; - const char *firstEp; - const char *secondEp; - int32_t supportVnodes; - uint16_t serverPort; -} SDnodeData; +// dmEps.c +int32_t dmReadEps(SDnodeData *pData); +int32_t dmWriteEps(SDnodeData *pData); +void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c similarity index 71% rename from source/dnode/mgmt/mgmt_dnode/src/dmEps.c rename to source/dnode/mgmt/node_util/src/dmEps.c index 9ebb02b964..07fbe44efd 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -14,16 +14,16 @@ */ #define _DEFAULT_SOURCE -#include "dmInt.h" +#include "dmUtil.h" -static void dmPrintEps(SDnodeMgmt *pMgmt); -static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep); -static void dmResetEps(SDnodeMgmt *pMgmt, SArray *dnodeEps); +static void dmPrintEps(SDnodeData *pData); +static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep); +static void dmResetEps(SDnodeData *pData, SArray *dnodeEps); -static void dmGetDnodeEp(SDnodeMgmt *pMgmt, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { - taosRLockLatch(&pMgmt->data.latch); +static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { + taosRLockLatch(&pData->latch); - SDnodeEp *pDnodeEp = taosHashGet(pMgmt->data.dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { if (pPort != NULL) { *pPort = pDnodeEp->ep.port; @@ -36,10 +36,10 @@ static void dmGetDnodeEp(SDnodeMgmt *pMgmt, int32_t dnodeId, char *pEp, char *pF } } - taosRUnLockLatch(&pMgmt->data.latch); + taosRUnLockLatch(&pData->latch); } -int32_t dmReadEps(SDnodeMgmt *pMgmt) { +int32_t dmReadEps(SDnodeData *pData) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; int32_t maxLen = 256 * 1024; @@ -48,13 +48,13 @@ int32_t dmReadEps(SDnodeMgmt *pMgmt) { char file[PATH_MAX] = {0}; TdFilePtr pFile = NULL; - pMgmt->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); - if (pMgmt->data.dnodeEps == NULL) { + pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); + if (pData->dnodeEps == NULL) { dError("failed to calloc dnodeEp array since %s", strerror(errno)); goto _OVER; } - snprintf(file, sizeof(file), "%s%sdnode.json", pMgmt->path, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", pData->dataDir, TD_DIRSEP, TD_DIRSEP); pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { code = 0; @@ -79,21 +79,21 @@ int32_t dmReadEps(SDnodeMgmt *pMgmt) { dError("failed to read %s since dnodeId not found", file); goto _OVER; } - pMgmt->data.dnodeId = dnodeId->valueint; + pData->dnodeId = dnodeId->valueint; cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { dError("failed to read %s since clusterId not found", file); goto _OVER; } - pMgmt->data.clusterId = atoll(clusterId->valuestring); + pData->clusterId = atoll(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", file); goto _OVER; } - pMgmt->data.dropped = dropped->valueint; + pData->dropped = dropped->valueint; cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); if (!dnodes || dnodes->type != cJSON_Array) { @@ -143,29 +143,29 @@ int32_t dmReadEps(SDnodeMgmt *pMgmt) { } dnodeEp.isMnode = isMnode->valueint; - taosArrayPush(pMgmt->data.dnodeEps, &dnodeEp); + taosArrayPush(pData->dnodeEps, &dnodeEp); } code = 0; dDebug("succcessed to read file %s", file); - dmPrintEps(pMgmt); + dmPrintEps(pData); _OVER: if (content != NULL) taosMemoryFree(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); - if (taosArrayGetSize(pMgmt->data.dnodeEps) == 0) { + if (taosArrayGetSize(pData->dnodeEps) == 0) { SDnodeEp dnodeEp = {0}; dnodeEp.isMnode = 1; - taosGetFqdnPortFromEp(pMgmt->data.firstEp, &dnodeEp.ep); - taosArrayPush(pMgmt->data.dnodeEps, &dnodeEp); + taosGetFqdnPortFromEp(pData->firstEp, &dnodeEp.ep); + taosArrayPush(pData->dnodeEps, &dnodeEp); } - dmResetEps(pMgmt, pMgmt->data.dnodeEps); + dmResetEps(pData, pData->dnodeEps); - if (dmIsEpChanged(pMgmt, pMgmt->data.dnodeId, pMgmt->data.localEp)) { - dError("localEp %s different with %s and need reconfigured", pMgmt->data.localEp, file); + if (dmIsEpChanged(pData, pData->dnodeId, pData->localEp)) { + dError("localEp %s different with %s and need reconfigured", pData->localEp, file); return -1; } @@ -173,11 +173,12 @@ _OVER: return code; } -int32_t dmWriteEps(SDnodeMgmt *pMgmt) { +int32_t dmWriteEps(SDnodeData *pData) { char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%s%sdnode.json.bak", pMgmt->path, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pMgmt->path, TD_DIRSEP); + + snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", pData->dataDir, TD_DIRSEP, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", pData->dataDir, TD_DIRSEP, TD_DIRSEP); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -191,14 +192,14 @@ int32_t dmWriteEps(SDnodeMgmt *pMgmt) { char *content = taosMemoryCalloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->data.dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->data.clusterId); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->data.dropped); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pData->dnodeId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pData->clusterId); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pData->dropped); len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); - int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->data.dnodeEps); + int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps); for (int32_t i = 0; i < numOfEps; ++i) { - SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->data.dnodeEps, i); + SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i); len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn); len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port); @@ -222,41 +223,41 @@ int32_t dmWriteEps(SDnodeMgmt *pMgmt) { return -1; } - pMgmt->data.updateTime = taosGetTimestampMs(); + pData->updateTime = taosGetTimestampMs(); dDebug("successed to write %s", realfile); return 0; } -void dmUpdateEps(SDnodeMgmt *pMgmt, SArray *eps) { +void dmUpdateEps(SDnodeData *pData, SArray *eps) { int32_t numOfEps = taosArrayGetSize(eps); if (numOfEps <= 0) return; - taosWLockLatch(&pMgmt->data.latch); + taosWLockLatch(&pData->latch); - int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->data.dnodeEps); + int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps); if (numOfEps != numOfEpsOld) { - dmResetEps(pMgmt, eps); - dmWriteEps(pMgmt); + dmResetEps(pData, eps); + dmWriteEps(pData); } else { int32_t size = numOfEps * sizeof(SDnodeEp); - if (memcmp(pMgmt->data.dnodeEps->pData, eps->pData, size) != 0) { - dmResetEps(pMgmt, eps); - dmWriteEps(pMgmt); + if (memcmp(pData->dnodeEps->pData, eps->pData, size) != 0) { + dmResetEps(pData, eps); + dmWriteEps(pData); } } - taosWUnLockLatch(&pMgmt->data.latch); + taosWUnLockLatch(&pData->latch); } -static void dmResetEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) { - if (pMgmt->data.dnodeEps != dnodeEps) { - SArray *tmp = pMgmt->data.dnodeEps; - pMgmt->data.dnodeEps = taosArrayDup(dnodeEps); +static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) { + if (pData->dnodeEps != dnodeEps) { + SArray *tmp = pData->dnodeEps; + pData->dnodeEps = taosArrayDup(dnodeEps); taosArrayDestroy(tmp); } - pMgmt->data.mnodeEps.inUse = 0; - pMgmt->data.mnodeEps.numOfEps = 0; + pData->mnodeEps.inUse = 0; + pData->mnodeEps.numOfEps = 0; int32_t mIndex = 0; int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps); @@ -265,35 +266,35 @@ static void dmResetEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) { SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i); if (!pDnodeEp->isMnode) continue; if (mIndex >= TSDB_MAX_REPLICA) continue; - pMgmt->data.mnodeEps.numOfEps++; + pData->mnodeEps.numOfEps++; - pMgmt->data.mnodeEps.eps[mIndex] = pDnodeEp->ep; + pData->mnodeEps.eps[mIndex] = pDnodeEp->ep; mIndex++; } for (int32_t i = 0; i < numOfEps; i++) { SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i); - taosHashPut(pMgmt->data.dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); + taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); } - dmPrintEps(pMgmt); + dmPrintEps(pData); } -static void dmPrintEps(SDnodeMgmt *pMgmt) { - int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->data.dnodeEps); +static void dmPrintEps(SDnodeData *pData) { + int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps); dDebug("print dnode ep list, num:%d", numOfEps); for (int32_t i = 0; i < numOfEps; i++) { - SDnodeEp *pEp = taosArrayGet(pMgmt->data.dnodeEps, i); + SDnodeEp *pEp = taosArrayGet(pData->dnodeEps, i); dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode); } } -static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep) { +static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) { bool changed = false; if (dnodeId == 0) return changed; - taosRLockLatch(&pMgmt->data.latch); + taosRLockLatch(&pData->latch); - SDnodeEp *pDnodeEp = taosHashGet(pMgmt->data.dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { char epstr[TSDB_EP_LEN + 1] = {0}; snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); @@ -303,6 +304,6 @@ static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep) { } } - taosRUnLockLatch(&pMgmt->data.latch); + taosRUnLockLatch(&pData->latch); return changed; }