From 20b69c5e617cc9e02737d768e58c5dc0baa9cd64 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 13 Mar 2022 20:32:31 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/bnode/inc/dndBnode.h | 2 +- source/dnode/mgmt/bnode/src/dndBnode.c | 3 + source/dnode/mgmt/dnode/inc/dndFile.h | 37 +++ source/dnode/mgmt/dnode/inc/dndInt.h | 2 +- source/dnode/mgmt/dnode/src/dndFile.c | 294 +++++++++++++++++ source/dnode/mgmt/dnode/src/dndMain.c | 34 +- source/dnode/mgmt/dnode/src/dndMgmt.c | 305 ++---------------- source/dnode/mgmt/dnode/src/dndMonitor.c | 5 +- source/dnode/mgmt/mnode/inc/mm.h | 2 +- source/dnode/mgmt/mnode/src/mmFile.c | 3 + source/dnode/mgmt/mnode/src/mmHandle.c | 3 + source/dnode/mgmt/mnode/src/mmMgmt.c | 3 + source/dnode/mgmt/mnode/src/mmWorker.c | 3 + .../dnode/mgmt/qnode/{src => inc}/dndQnode.h | 2 +- .../dnode/mgmt/qnode/{inc => src}/dndQnode.c | 3 + source/dnode/mgmt/snode/inc/dndSnode.h | 2 +- source/dnode/mgmt/snode/src/dndSnode.c | 3 + source/dnode/mgmt/vnode/inc/dndVnodes.h | 2 +- source/dnode/mgmt/vnode/src/dndVnodes.c | 3 + 19 files changed, 403 insertions(+), 308 deletions(-) create mode 100644 source/dnode/mgmt/dnode/inc/dndFile.h create mode 100644 source/dnode/mgmt/dnode/src/dndFile.c rename source/dnode/mgmt/qnode/{src => inc}/dndQnode.h (98%) rename source/dnode/mgmt/qnode/{inc => src}/dndQnode.c (99%) diff --git a/source/dnode/mgmt/bnode/inc/dndBnode.h b/source/dnode/mgmt/bnode/inc/dndBnode.h index 080cd2e487..853b54ff69 100644 --- a/source/dnode/mgmt/bnode/inc/dndBnode.h +++ b/source/dnode/mgmt/bnode/inc/dndBnode.h @@ -19,7 +19,7 @@ #ifdef __cplusplus extern "C" { #endif -#include "dndEnv.h" +#include "dndInt.h" int32_t dndInitBnode(SDnode *pDnode); void dndCleanupBnode(SDnode *pDnode); diff --git a/source/dnode/mgmt/bnode/src/dndBnode.c b/source/dnode/mgmt/bnode/src/dndBnode.c index 81b020c152..8152107150 100644 --- a/source/dnode/mgmt/bnode/src/dndBnode.c +++ b/source/dnode/mgmt/bnode/src/dndBnode.c @@ -19,6 +19,7 @@ #include "dndTransport.h" #include "dndWorker.h" +#if 0 static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); static SBnode *dndAcquireBnode(SDnode *pDnode) { @@ -390,3 +391,5 @@ void dndCleanupBnode(SDnode *pDnode) { pMgmt->pBnode = NULL; } } + +#endif \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/inc/dndFile.h b/source/dnode/mgmt/dnode/inc/dndFile.h new file mode 100644 index 0000000000..07049d04b9 --- /dev/null +++ b/source/dnode/mgmt/dnode/inc/dndFile.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_FILE_H_ +#define _TD_DND_FILE_H_ + +#include "dndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dndReadFile(SDnode *pDnode); +int32_t dndWriteFile(SDnode *pDnode); + +void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps); +void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps); +void dndPrintDnodes(SDnode *pDnode); +bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_FILE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/inc/dndInt.h b/source/dnode/mgmt/dnode/inc/dndInt.h index 907890cd23..d0b74834d3 100644 --- a/source/dnode/mgmt/dnode/inc/dndInt.h +++ b/source/dnode/mgmt/dnode/inc/dndInt.h @@ -94,7 +94,6 @@ typedef struct { int64_t updateTime; int8_t statusSent; SEpSet mnodeEpSet; - char *file; SHashObj *dnodeHash; SArray *pDnodeEps; pthread_t *threadId; @@ -207,6 +206,7 @@ typedef struct SDnode { STfs *pTfs; SMgmtFp fps[NODE_MAX]; SMgmtWrapper mgmts[NODE_MAX]; + char *path; } SDnode; EDndStatus dndGetStatus(SDnode *pDnode); diff --git a/source/dnode/mgmt/dnode/src/dndFile.c b/source/dnode/mgmt/dnode/src/dndFile.c new file mode 100644 index 0000000000..f903fc6e33 --- /dev/null +++ b/source/dnode/mgmt/dnode/src/dndFile.c @@ -0,0 +1,294 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dndFile.h" + +int32_t dndReadFile(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + + pMgmt->pDnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); + if (pMgmt->pDnodeEps == NULL) { + dError("failed to calloc dnodeEp array since %s", strerror(errno)); + goto PRASE_DNODE_OVER; + } + + int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 256 * 1024; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + + char file[PATH_MAX]; + snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->path, TD_DIRSEP); + + TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); + if (pFile == NULL) { + dDebug("file %s not exist", file); + code = 0; + goto PRASE_DNODE_OVER; + } + + len = (int32_t)taosReadFile(pFile, content, maxLen); + if (len <= 0) { + dError("failed to read %s since content is null", file); + goto PRASE_DNODE_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", file); + goto PRASE_DNODE_OVER; + } + + cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); + if (!dnodeId || dnodeId->type != cJSON_Number) { + dError("failed to read %s since dnodeId not found", file); + goto PRASE_DNODE_OVER; + } + pMgmt->dnodeId = dnodeId->valueint; + + cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); + if (!clusterId || clusterId->type != cJSON_String) { + dError("failed to read %s since clusterId not found", file); + goto PRASE_DNODE_OVER; + } + pMgmt->clusterId = atoll(clusterId->valuestring); + + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_Number) { + dError("failed to read %s since dropped not found", file); + goto PRASE_DNODE_OVER; + } + pMgmt->dropped = dropped->valueint; + + cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); + if (!dnodes || dnodes->type != cJSON_Array) { + dError("failed to read %s since dnodes not found", file); + goto PRASE_DNODE_OVER; + } + + int32_t numOfDnodes = cJSON_GetArraySize(dnodes); + if (numOfDnodes <= 0) { + dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes); + goto PRASE_DNODE_OVER; + } + + for (int32_t i = 0; i < numOfDnodes; ++i) { + cJSON *node = cJSON_GetArrayItem(dnodes, i); + if (node == NULL) break; + + SDnodeEp dnodeEp = {0}; + + cJSON *did = cJSON_GetObjectItem(node, "id"); + if (!did || did->type != cJSON_Number) { + dError("failed to read %s since dnodeId not found", file); + goto PRASE_DNODE_OVER; + } + + dnodeEp.id = dnodeId->valueint; + + cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); + if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { + dError("failed to read %s since dnodeFqdn not found", file); + goto PRASE_DNODE_OVER; + } + tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); + + cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); + if (!dnodePort || dnodePort->type != cJSON_Number) { + dError("failed to read %s since dnodePort not found", file); + goto PRASE_DNODE_OVER; + } + + dnodeEp.ep.port = dnodePort->valueint; + + cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); + if (!isMnode || isMnode->type != cJSON_Number) { + dError("failed to read %s since isMnode not found", file); + goto PRASE_DNODE_OVER; + } + dnodeEp.isMnode = isMnode->valueint; + + taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); + } + + code = 0; + dInfo("succcessed to read file %s", file); + dndPrintDnodes(pDnode); + +PRASE_DNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (pFile != NULL) taosCloseFile(&pFile); + + if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->cfg.localEp)) { + dError("localEp %s different with %s and need reconfigured", pDnode->cfg.localEp, file); + return -1; + } + + if (taosArrayGetSize(pMgmt->pDnodeEps) == 0) { + SDnodeEp dnodeEp = {0}; + dnodeEp.isMnode = 1; + taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &dnodeEp.ep); + taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); + } + + dndResetDnodes(pDnode, pMgmt->pDnodeEps); + + terrno = 0; + return 0; +} + +int32_t dndWriteFile(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + + char file[PATH_MAX]; + snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->path, TD_DIRSEP); + + TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + dError("failed to write %s since %s", file, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 256 * 1024; + char *content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); + + int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); + for (int32_t i = 0; i < numOfEps; ++i) { + SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->pDnodeEps, i); + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port); + len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode); + if (i < numOfEps - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + taosWriteFile(pFile, content, len); + taosFsyncFile(pFile); + taosCloseFile(&pFile); + free(content); + + char realfile[PATH_MAX]; + snprintf(realfile, sizeof(realfile), "%s%smnode.json", pDnode->path, TD_DIRSEP); + + if (taosRenameFile(file, realfile) != 0) { + terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; + dError("failed to rename %s since %s", file, terrstr()); + return -1; + } + + pMgmt->updateTime = taosGetTimestampMs(); + dDebug("successed to write %s", file); + return 0; +} + +void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps) { + int32_t numOfEps = taosArrayGetSize(pDnodeEps); + if (numOfEps <= 0) return; + + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + taosWLockLatch(&pMgmt->latch); + + int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); + if (numOfEps != numOfEpsOld) { + dndResetDnodes(pDnode, pDnodeEps); + dndWriteFile(pDnode); + } else { + int32_t size = numOfEps * sizeof(SDnodeEp); + if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) { + dndResetDnodes(pDnode, pDnodeEps); + dndWriteFile(pDnode); + } + } + + taosWUnLockLatch(&pMgmt->latch); +} + +void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + + if (pMgmt->pDnodeEps != pDnodeEps) { + SArray *tmp = pMgmt->pDnodeEps; + pMgmt->pDnodeEps = taosArrayDup(pDnodeEps); + taosArrayDestroy(tmp); + } + + pMgmt->mnodeEpSet.inUse = 0; + pMgmt->mnodeEpSet.numOfEps = 0; + + int32_t mIndex = 0; + int32_t numOfEps = (int32_t)taosArrayGetSize(pDnodeEps); + + for (int32_t i = 0; i < numOfEps; i++) { + SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i); + if (!pDnodeEp->isMnode) continue; + if (mIndex >= TSDB_MAX_REPLICA) continue; + pMgmt->mnodeEpSet.numOfEps++; + + pMgmt->mnodeEpSet.eps[mIndex] = pDnodeEp->ep; + mIndex++; + } + + for (int32_t i = 0; i < numOfEps; i++) { + SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i); + taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); + } + + dndPrintDnodes(pDnode); +} + +void dndPrintDnodes(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + + int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); + dDebug("print dnode ep list, num:%d", numOfEps); + for (int32_t i = 0; i < numOfEps; i++) { + SDnodeEp *pEp = taosArrayGet(pMgmt->pDnodeEps, i); + dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode); + } +} + +bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) { + bool changed = false; + + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + taosRLockLatch(&pMgmt->latch); + + SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); + if (pDnodeEp != NULL) { + char epstr[TSDB_EP_LEN + 1]; + snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); + changed = strcmp(pEp, epstr) != 0; + } + + taosRUnLockLatch(&pMgmt->latch); + return changed; +} diff --git a/source/dnode/mgmt/dnode/src/dndMain.c b/source/dnode/mgmt/dnode/src/dndMain.c index 36ca3aada0..caa6961c88 100644 --- a/source/dnode/mgmt/dnode/src/dndMain.c +++ b/source/dnode/mgmt/dnode/src/dndMain.c @@ -82,6 +82,7 @@ static void dndClearDnodeMem(SDnode *pDnode) { taosUnLockFile(pDnode->pLockFile); taosCloseFile(&pDnode->pLockFile); } + tfree(pDnode->path); dDebug("dnode object memory is cleared, data:%p", pDnode); } @@ -140,6 +141,15 @@ SDnode *dndCreate(SDndCfg *pCfg) { } dndSetStatus(pDnode, DND_STAT_INIT); + + snprintf(path, sizeof(path), "%s%sdnode", pCfg->dataDir, TD_DIRSEP); + pDnode->path = strdup(path); + if (taosMkDir(path) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to create dir:%s since %s", path, terrstr()); + goto _OVER; + } + pDnode->mgmts[MNODE].fp = mmGetNodeFp(); pDnode->mgmts[VNODES].fp = vndGetNodeFp(); pDnode->mgmts[QNODE].fp = qndGetNodeFp(); @@ -177,13 +187,6 @@ SDnode *dndCreate(SDndCfg *pCfg) { goto _OVER; } - snprintf(path, sizeof(path), "%s%sdnode", pCfg->dataDir, TD_DIRSEP); - if (taosMkDir(path) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to create dir:%s since %s", path, terrstr()); - goto _OVER; - } - _OVER: if (code != 0 && pDnode) { dndClearDnodeMem(pDnode); @@ -285,14 +288,15 @@ int32_t dndInit() { return -1; } - SVnodeOpt vnodeOpt = { - .nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp = dndSendReqToDnode}; - if (vnodeInit(&vnodeOpt) != 0) { - dError("failed to init vnode since %s", terrstr()); - dndCleanup(); - return -1; - } + // SVnodeOpt vnodeOpt = { + // .nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp = dndSendReqToDnode}; + + // if (vnodeInit(&vnodeOpt) != 0) { + // dError("failed to init vnode since %s", terrstr()); + // dndCleanup(); + // return -1; + // } SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn, .comp = tsMonitorComp}; if (monInit(&monCfg) != 0) { @@ -312,7 +316,7 @@ void dndCleanup() { } walCleanUp(); - vnodeCleanup(); + // vnodeCleanup(); rpcCleanup(); monCleanup(); diff --git a/source/dnode/mgmt/dnode/src/dndMgmt.c b/source/dnode/mgmt/dnode/src/dndMgmt.c index 880c4dd040..4822c91d20 100644 --- a/source/dnode/mgmt/dnode/src/dndMgmt.c +++ b/source/dnode/mgmt/dnode/src/dndMgmt.c @@ -15,21 +15,21 @@ #define _DEFAULT_SOURCE #include "dndMgmt.h" -#include "dndBnode.h" -#include "mm.h" -#include "dndQnode.h" -#include "dndSnode.h" -#include "dndTransport.h" -#include "dndVnodes.h" -#include "dndWorker.h" -#include "monitor.h" - #include "dndMonitor.h" +// #include "dndBnode.h" +// #include "mm.h" +// #include "dndQnode.h" +// #include "dndSnode.h" +#include "dndTransport.h" +// #include "dndVnodes.h" +#include "dndWorker.h" +// #include "monitor.h" +#if 0 static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); -static int32_t dndReadDnodes(SDnode *pDnode); -static int32_t dndWriteDnodes(SDnode *pDnode); +static int32_t dndReadFile(SDnode *pDnode); +static int32_t dndWriteFile(SDnode *pDnode); static void *dnodeThreadRoutine(void *param); static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq); @@ -113,246 +113,6 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { taosWUnLockLatch(&pMgmt->latch); } -static void dndPrintDnodes(SDnode *pDnode) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - - int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); - dDebug("print dnode ep list, num:%d", numOfEps); - for (int32_t i = 0; i < numOfEps; i++) { - SDnodeEp *pEp = taosArrayGet(pMgmt->pDnodeEps, i); - dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode); - } -} - -static void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - - if (pMgmt->pDnodeEps != pDnodeEps) { - SArray *tmp = pMgmt->pDnodeEps; - pMgmt->pDnodeEps = taosArrayDup(pDnodeEps); - taosArrayDestroy(tmp); - } - - pMgmt->mnodeEpSet.inUse = 0; - pMgmt->mnodeEpSet.numOfEps = 0; - - int32_t mIndex = 0; - int32_t numOfEps = (int32_t)taosArrayGetSize(pDnodeEps); - - for (int32_t i = 0; i < numOfEps; i++) { - SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i); - if (!pDnodeEp->isMnode) continue; - if (mIndex >= TSDB_MAX_REPLICA) continue; - pMgmt->mnodeEpSet.numOfEps++; - - pMgmt->mnodeEpSet.eps[mIndex] = pDnodeEp->ep; - mIndex++; - } - - for (int32_t i = 0; i < numOfEps; i++) { - SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i); - taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); - } - - dndPrintDnodes(pDnode); -} - -static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) { - bool changed = false; - - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - taosRLockLatch(&pMgmt->latch); - - SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); - if (pDnodeEp != NULL) { - char epstr[TSDB_EP_LEN + 1]; - snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); - changed = strcmp(pEp, epstr) != 0; - } - - taosRUnLockLatch(&pMgmt->latch); - return changed; -} - -static int32_t dndReadDnodes(SDnode *pDnode) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - - pMgmt->pDnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); - if (pMgmt->pDnodeEps == NULL) { - dError("failed to calloc dnodeEp array since %s", strerror(errno)); - goto PRASE_DNODE_OVER; - } - - int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; - int32_t len = 0; - int32_t maxLen = 256 * 1024; - char *content = calloc(1, maxLen + 1); - cJSON *root = NULL; - - TdFilePtr pFile = taosOpenFile(pMgmt->file, TD_FILE_READ); - if (pFile == NULL) { - dDebug("file %s not exist", pMgmt->file); - code = 0; - goto PRASE_DNODE_OVER; - } - - len = (int32_t)taosReadFile(pFile, content, maxLen); - if (len <= 0) { - dError("failed to read %s since content is null", pMgmt->file); - goto PRASE_DNODE_OVER; - } - - content[len] = 0; - root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read %s since invalid json format", pMgmt->file); - goto PRASE_DNODE_OVER; - } - - cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); - if (!dnodeId || dnodeId->type != cJSON_Number) { - dError("failed to read %s since dnodeId not found", pMgmt->file); - goto PRASE_DNODE_OVER; - } - pMgmt->dnodeId = dnodeId->valueint; - - cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); - if (!clusterId || clusterId->type != cJSON_String) { - dError("failed to read %s since clusterId not found", pMgmt->file); - goto PRASE_DNODE_OVER; - } - pMgmt->clusterId = atoll(clusterId->valuestring); - - cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); - if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", pMgmt->file); - goto PRASE_DNODE_OVER; - } - pMgmt->dropped = dropped->valueint; - - cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); - if (!dnodes || dnodes->type != cJSON_Array) { - dError("failed to read %s since dnodes not found", pMgmt->file); - goto PRASE_DNODE_OVER; - } - - int32_t numOfDnodes = cJSON_GetArraySize(dnodes); - if (numOfDnodes <= 0) { - dError("failed to read %s since numOfDnodes:%d invalid", pMgmt->file, numOfDnodes); - goto PRASE_DNODE_OVER; - } - - for (int32_t i = 0; i < numOfDnodes; ++i) { - cJSON *node = cJSON_GetArrayItem(dnodes, i); - if (node == NULL) break; - - SDnodeEp dnodeEp = {0}; - - cJSON *did = cJSON_GetObjectItem(node, "id"); - if (!did || did->type != cJSON_Number) { - dError("failed to read %s since dnodeId not found", pMgmt->file); - goto PRASE_DNODE_OVER; - } - - dnodeEp.id = dnodeId->valueint; - - cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); - if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { - dError("failed to read %s since dnodeFqdn not found", pMgmt->file); - goto PRASE_DNODE_OVER; - } - tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); - - cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); - if (!dnodePort || dnodePort->type != cJSON_Number) { - dError("failed to read %s since dnodePort not found", pMgmt->file); - goto PRASE_DNODE_OVER; - } - - dnodeEp.ep.port = dnodePort->valueint; - - cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); - if (!isMnode || isMnode->type != cJSON_Number) { - dError("failed to read %s since isMnode not found", pMgmt->file); - goto PRASE_DNODE_OVER; - } - dnodeEp.isMnode = isMnode->valueint; - - taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); - } - - code = 0; - dInfo("succcessed to read file %s", pMgmt->file); - dndPrintDnodes(pDnode); - -PRASE_DNODE_OVER: - if (content != NULL) free(content); - if (root != NULL) cJSON_Delete(root); - if (pFile != NULL) taosCloseFile(&pFile); - - if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->cfg.localEp)) { - dError("localEp %s different with %s and need reconfigured", pDnode->cfg.localEp, pMgmt->file); - return -1; - } - - if (taosArrayGetSize(pMgmt->pDnodeEps) == 0) { - SDnodeEp dnodeEp = {0}; - dnodeEp.isMnode = 1; - taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &dnodeEp.ep); - taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); - } - - dndResetDnodes(pDnode, pMgmt->pDnodeEps); - - terrno = 0; - return 0; -} - -static int32_t dndWriteDnodes(SDnode *pDnode) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - - TdFilePtr pFile = taosOpenFile(pMgmt->file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - dError("failed to write %s since %s", pMgmt->file, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - int32_t len = 0; - int32_t maxLen = 256 * 1024; - char *content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); - len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); - - int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); - for (int32_t i = 0; i < numOfEps; ++i) { - SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->pDnodeEps, i); - len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port); - len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode); - if (i < numOfEps - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }]\n"); - } - } - len += snprintf(content + len, maxLen - len, "}\n"); - - taosWriteFile(pFile, content, len); - taosFsyncFile(pFile); - taosCloseFile(&pFile); - free(content); - terrno = 0; - - pMgmt->updateTime = taosGetTimestampMs(); - dDebug("successed to write %s", pMgmt->file); - return 0; -} void dndSendStatusReq(SDnode *pDnode) { SStatusReq req = {0}; @@ -378,8 +138,10 @@ void dndSendStatusReq(SDnode *pDnode) { memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); taosRUnLockLatch(&pMgmt->latch); +#if 0 req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); dndGetVnodeLoads(pDnode, req.pVloads); +#endif int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); @@ -400,33 +162,11 @@ static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { taosWLockLatch(&pMgmt->latch); pMgmt->dnodeId = pCfg->dnodeId; pMgmt->clusterId = pCfg->clusterId; - dndWriteDnodes(pDnode); + dndWriteFile(pDnode); taosWUnLockLatch(&pMgmt->latch); } } -static void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps) { - int32_t numOfEps = taosArrayGetSize(pDnodeEps); - if (numOfEps <= 0) return; - - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - taosWLockLatch(&pMgmt->latch); - - int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); - if (numOfEps != numOfEpsOld) { - dndResetDnodes(pDnode, pDnodeEps); - dndWriteDnodes(pDnode); - } else { - int32_t size = numOfEps * sizeof(SDnodeEp); - if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) { - dndResetDnodes(pDnode, pDnodeEps); - dndWriteDnodes(pDnode); - } - } - - taosWUnLockLatch(&pMgmt->latch); -} - static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; @@ -434,7 +174,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId); pMgmt->dropped = 1; - dndWriteDnodes(pDnode); + dndWriteFile(pDnode); } } else { SStatusRsp statusRsp = {0}; @@ -514,14 +254,6 @@ int32_t dndInitMgmt(SDnode *pDnode) { pMgmt->clusterId = 0; taosInitRWLatch(&pMgmt->latch); - char path[PATH_MAX]; - snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode); - pMgmt->file = strdup(path); - if (pMgmt->file == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (pMgmt->dnodeHash == NULL) { dError("failed to init dnode hash"); @@ -529,7 +261,7 @@ int32_t dndInitMgmt(SDnode *pDnode) { return -1; } - if (dndReadDnodes(pDnode) != 0) { + if (dndReadFile(pDnode) != 0) { dError("failed to read file:%s since %s", pMgmt->file, terrstr()); return -1; } @@ -619,6 +351,7 @@ void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { int32_t code = 0; +#if 0 switch (pMsg->msgType) { case TDMT_DND_CREATE_MNODE: code = mmProcessCreateMnodeReq(pDnode, pMsg); @@ -680,7 +413,7 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { dError("RPC %p, dnode msg:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); break; } - +#endif if (pMsg->msgType & 1u) { if (code != 0) code = terrno; SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle}; @@ -691,3 +424,5 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { pMsg->pCont = NULL; taosFreeQitem(pMsg); } + +#endif \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/src/dndMonitor.c b/source/dnode/mgmt/dnode/src/dndMonitor.c index 54c6e384ab..53ae4bc361 100644 --- a/source/dnode/mgmt/dnode/src/dndMonitor.c +++ b/source/dnode/mgmt/dnode/src/dndMonitor.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "dndMonitor.h" +#include "dndMgmt.h" static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) { tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name)); @@ -72,7 +73,7 @@ void dndSendMonitorReport(SDnode *pDnode) { SMonBasicInfo basicInfo = {0}; dndGetMonitorBasicInfo(pDnode, &basicInfo); monSetBasicInfo(pMonitor, &basicInfo); - +#if 0 SMonClusterInfo clusterInfo = {0}; SMonVgroupInfo vgroupInfo = {0}; SMonGrantInfo grantInfo = {0}; @@ -95,7 +96,7 @@ void dndSendMonitorReport(SDnode *pDnode) { taosArrayDestroy(clusterInfo.mnodes); taosArrayDestroy(vgroupInfo.vgroups); taosArrayDestroy(diskInfo.datadirs); - +#endif monSendReport(pMonitor); monCleanupMonitorInfo(pMonitor); } \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/inc/mm.h b/source/dnode/mgmt/mnode/inc/mm.h index 2dd3ce0b5b..fde2cb7d3d 100644 --- a/source/dnode/mgmt/mnode/inc/mm.h +++ b/source/dnode/mgmt/mnode/inc/mm.h @@ -19,7 +19,7 @@ #ifdef __cplusplus extern "C" { #endif -#include "dndEnv.h" +#include "dndInt.h" // interface int32_t mmInit(SDnode *pDnode); diff --git a/source/dnode/mgmt/mnode/src/mmFile.c b/source/dnode/mgmt/mnode/src/mmFile.c index 89d7eefab2..90b5df3410 100644 --- a/source/dnode/mgmt/mnode/src/mmFile.c +++ b/source/dnode/mgmt/mnode/src/mmFile.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mm.h" +#if 0 int32_t mmReadFile(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -166,3 +167,5 @@ int32_t mmWriteFile(SDnode *pDnode) { dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); return 0; } + +#endif \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/src/mmHandle.c b/source/dnode/mgmt/mnode/src/mmHandle.c index 34fe71eef7..f013daaec0 100644 --- a/source/dnode/mgmt/mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mnode/src/mmHandle.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mm.h" +#if 0 #include "dndMgmt.h" int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { @@ -137,3 +138,5 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt); return code; } + +#endif \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/src/mmMgmt.c b/source/dnode/mgmt/mnode/src/mmMgmt.c index 9c1728542b..c3a1d18db3 100644 --- a/source/dnode/mgmt/mnode/src/mmMgmt.c +++ b/source/dnode/mgmt/mnode/src/mmMgmt.c @@ -19,6 +19,7 @@ #include "dndMgmt.h" #include "dndTransport.h" +#if 0 static void mmInitOption(SDnode *pDnode, SMnodeOpt *pOption); static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption); static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption); @@ -315,3 +316,5 @@ static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) { dInfo("mnode open successfully"); return 0; } + +#endif \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index a721c5977a..a51523e2cb 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -20,6 +20,7 @@ #include "dndTransport.h" #include "dndWorker.h" +#if 0 static int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg); static int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg); static int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg); @@ -336,3 +337,5 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { rpcFreeCont(pRpc->pCont); taosFreeQitem(pMsg); } + +#endif \ No newline at end of file diff --git a/source/dnode/mgmt/qnode/src/dndQnode.h b/source/dnode/mgmt/qnode/inc/dndQnode.h similarity index 98% rename from source/dnode/mgmt/qnode/src/dndQnode.h rename to source/dnode/mgmt/qnode/inc/dndQnode.h index 2a25dca1c6..677c234679 100644 --- a/source/dnode/mgmt/qnode/src/dndQnode.h +++ b/source/dnode/mgmt/qnode/inc/dndQnode.h @@ -19,7 +19,7 @@ #ifdef __cplusplus extern "C" { #endif -#include "dndEnv.h" +#include "dndInt.h" int32_t dndInitQnode(SDnode *pDnode); void dndCleanupQnode(SDnode *pDnode); diff --git a/source/dnode/mgmt/qnode/inc/dndQnode.c b/source/dnode/mgmt/qnode/src/dndQnode.c similarity index 99% rename from source/dnode/mgmt/qnode/inc/dndQnode.c rename to source/dnode/mgmt/qnode/src/dndQnode.c index 93e2209610..cd21884522 100644 --- a/source/dnode/mgmt/qnode/inc/dndQnode.c +++ b/source/dnode/mgmt/qnode/src/dndQnode.c @@ -19,6 +19,7 @@ #include "dndTransport.h" #include "dndWorker.h" +#if 0 static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); static SQnode *dndAcquireQnode(SDnode *pDnode) { @@ -373,3 +374,5 @@ void dndCleanupQnode(SDnode *pDnode) { pMgmt->pQnode = NULL; } } + +#endif \ No newline at end of file diff --git a/source/dnode/mgmt/snode/inc/dndSnode.h b/source/dnode/mgmt/snode/inc/dndSnode.h index b21e9191e8..8cb883794d 100644 --- a/source/dnode/mgmt/snode/inc/dndSnode.h +++ b/source/dnode/mgmt/snode/inc/dndSnode.h @@ -19,7 +19,7 @@ #ifdef __cplusplus extern "C" { #endif -#include "dndEnv.h" +#include "dndInt.h" int32_t dndInitSnode(SDnode *pDnode); void dndCleanupSnode(SDnode *pDnode); diff --git a/source/dnode/mgmt/snode/src/dndSnode.c b/source/dnode/mgmt/snode/src/dndSnode.c index 4906aef246..d50c9709ff 100644 --- a/source/dnode/mgmt/snode/src/dndSnode.c +++ b/source/dnode/mgmt/snode/src/dndSnode.c @@ -19,6 +19,7 @@ #include "dndTransport.h" #include "dndWorker.h" +#if 0 static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); static SSnode *dndAcquireSnode(SDnode *pDnode) { @@ -368,3 +369,5 @@ void dndCleanupSnode(SDnode *pDnode) { pMgmt->pSnode = NULL; } } + +#endif \ No newline at end of file diff --git a/source/dnode/mgmt/vnode/inc/dndVnodes.h b/source/dnode/mgmt/vnode/inc/dndVnodes.h index 895e94060f..32f4260542 100644 --- a/source/dnode/mgmt/vnode/inc/dndVnodes.h +++ b/source/dnode/mgmt/vnode/inc/dndVnodes.h @@ -19,7 +19,7 @@ #ifdef __cplusplus extern "C" { #endif -#include "dndEnv.h" +#include "dndInt.h" int32_t dndInitVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode); diff --git a/source/dnode/mgmt/vnode/src/dndVnodes.c b/source/dnode/mgmt/vnode/src/dndVnodes.c index d311e1e417..8795551fc0 100644 --- a/source/dnode/mgmt/vnode/src/dndVnodes.c +++ b/source/dnode/mgmt/vnode/src/dndVnodes.c @@ -19,6 +19,7 @@ #include "dndTransport.h" #include "sync.h" +#if 0 typedef struct { int32_t vgId; int32_t vgVersion; @@ -1022,3 +1023,5 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { pStat->numOfBatchInsertReqs = numOfBatchInsertReqs; pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; } + +#endif \ No newline at end of file