From 4c99c6eb0ecbf7d676dee2c4f022312d71a9e8e2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 24 Nov 2021 14:20:18 +0800 Subject: [PATCH 1/7] rename file --- source/dnode/mgmt/impl/src/{dndInt.c => dnode.c} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename source/dnode/mgmt/impl/src/{dndInt.c => dnode.c} (100%) diff --git a/source/dnode/mgmt/impl/src/dndInt.c b/source/dnode/mgmt/impl/src/dnode.c similarity index 100% rename from source/dnode/mgmt/impl/src/dndInt.c rename to source/dnode/mgmt/impl/src/dnode.c From bbb280a770be2aad8f88ae76d27b9a7fe6143183 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 26 Nov 2021 10:26:08 +0800 Subject: [PATCH 2/7] minor changes --- source/dnode/mgmt/impl/inc/dndInt.h | 14 +++++++------- source/dnode/mgmt/impl/src/dnode.c | 10 +++++----- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 966781426b..fba589c73a 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -37,12 +37,12 @@ extern "C" { extern int32_t dDebugFlag; -#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("SRV FATAL ", 255, __VA_ARGS__); }} -#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("SRV ERROR ", 255, __VA_ARGS__); }} -#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("SRV WARN ", 255, __VA_ARGS__); }} -#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("SRV ", 255, __VA_ARGS__); }} -#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }} -#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }} +#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }} +#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }} +#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }} +#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", 255, __VA_ARGS__); }} +#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} +#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat; typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); @@ -119,7 +119,7 @@ EStat dndGetStat(SDnode *pDnode); void dndSetStat(SDnode *pDnode, EStat stat); char *dndStatStr(EStat stat); -void dndReportStartup(SDnode *pDnode, char *name, char *desc); +void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup); #ifdef __cplusplus diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index f4cee3f6fd..1261136fd3 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -20,8 +20,8 @@ #include "dndVnodes.h" #include "sync.h" #include "tcache.h" -#include "wal.h" #include "tcrc32c.h" +#include "wal.h" EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; } @@ -43,10 +43,10 @@ char *dndStatStr(EStat stat) { } } -void dndReportStartup(SDnode *pDnode, char *name, char *desc) { +void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc) { SStartupMsg *pStartup = &pDnode->startup; - tstrncpy(pStartup->name, name, strlen(pStartup->name)); - tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); + tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); + tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); pStartup->finished = 0; } @@ -61,7 +61,7 @@ static int32_t dndCheckRunning(char *dataDir) { FileFd fd = taosOpenFileCreateWriteTrunc(filepath); if (fd < 0) { - dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno)); + dError("failed to lock file:%s since %s, quit", filepath, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } From 04ea23584f789bd5650b8f6dc845828616dd1269 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 26 Nov 2021 11:13:45 +0800 Subject: [PATCH 3/7] refact dnode --- include/common/tglobal.h | 4 - include/dnode/mnode/mnode.h | 2 +- include/dnode/vnode/vnode.h | 2 +- include/util/tdef.h | 6 - source/common/src/tglobal.c | 9 - source/dnode/mgmt/impl/inc/dndDnode.h | 2 +- source/dnode/mgmt/impl/inc/dndInt.h | 26 +- source/dnode/mgmt/impl/src/dndDnode.c | 411 +++++++++++++------------- source/dnode/mnode/impl/src/mnode.c | 2 +- 9 files changed, 215 insertions(+), 249 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index f3fce8becd..f478e96766 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -28,10 +28,6 @@ extern char tsSecond[]; extern char tsLocalFqdn[]; extern char tsLocalEp[]; extern uint16_t tsServerPort; -extern uint16_t tsDnodeShellPort; -extern uint16_t tsDnodeDnodePort; -extern uint16_t tsSyncPort; -extern uint16_t tsArbitratorPort; extern int32_t tsStatusInterval; extern int32_t tsNumOfMnodes; extern int8_t tsEnableVnodeBak; diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 98aefc6db3..b7b05f896f 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -26,7 +26,7 @@ typedef struct SMnode SMnode; typedef struct SMnodeMsg SMnodeMsg; typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell); +typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnd, SMnodeMsg *pMsg); typedef struct SMnodeLoad { diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 586cb49d0f..8dc01b2a8a 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -187,7 +187,7 @@ typedef struct { typedef struct SDnode SDnode; typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell); +typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnd, int32_t vgId, SVnodeMsg *pMsg); typedef struct { diff --git a/include/util/tdef.h b/include/util/tdef.h index d61a3e7188..76df8887a0 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -358,12 +358,6 @@ do { \ #define TSDB_DEFAULT_STABLES_HASH_SIZE 100 #define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000 -#define TSDB_PORT_DNODESHELL 0 -#define TSDB_PORT_DNODEDNODE 5 -#define TSDB_PORT_SYNC 10 -#define TSDB_PORT_HTTP 11 -#define TSDB_PORT_ARBITRATOR 12 - #define TSDB_MAX_WAL_SIZE (1024*1024*3) #define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 501936d354..8d57218a64 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -33,10 +33,6 @@ char tsArbitrator[TSDB_EP_LEN] = {0}; char tsLocalFqdn[TSDB_FQDN_LEN] = {0}; char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port uint16_t tsServerPort = 6030; -uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035] -uint16_t tsDnodeDnodePort = 6035; // udp/tcp -uint16_t tsSyncPort = 6040; -uint16_t tsArbitratorPort = 6042; int32_t tsStatusInterval = 1; // second int32_t tsNumOfMnodes = 1; int8_t tsEnableVnodeBak = 1; @@ -1726,11 +1722,6 @@ int32_t taosCheckGlobalCfg() { } } - tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035] - tsDnodeDnodePort = tsServerPort + TSDB_PORT_DNODEDNODE; // udp/tcp - tsSyncPort = tsServerPort + TSDB_PORT_SYNC; - tsHttpPort = tsServerPort + TSDB_PORT_HTTP; - if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } diff --git a/source/dnode/mgmt/impl/inc/dndDnode.h b/source/dnode/mgmt/impl/inc/dndDnode.h index ef16b1c8f0..590a9611e1 100644 --- a/source/dnode/mgmt/impl/inc/dndDnode.h +++ b/source/dnode/mgmt/impl/inc/dndDnode.h @@ -30,7 +30,7 @@ int32_t dndGetDnodeId(SDnode *pDnd); int64_t dndGetClusterId(SDnode *pDnd); void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet); -void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell); +void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index fba589c73a..258f40aad6 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -31,9 +31,10 @@ extern "C" { #include "tthread.h" #include "ttime.h" #include "tworker.h" + +#include "dnode.h" #include "mnode.h" #include "vnode.h" -#include "dnode.h" extern int32_t dDebugFlag; @@ -54,17 +55,16 @@ typedef struct { } SDnodeDir; typedef struct { - int32_t dnodeId; - uint32_t rebootTime; - int32_t dropped; - int64_t clusterId; - SEpSet shellEpSet; - SEpSet peerEpSet; - char *file; - SHashObj *dnodeHash; - SDnodeEps *dnodeEps; - pthread_t *threadId; - pthread_mutex_t mutex; + int32_t dnodeId; + int32_t dropped; + uint32_t rebootTime; + int64_t clusterId; + SEpSet mnodeEpSet; + char *file; + SHashObj *dnodeHash; + SDnodeEps *dnodeEps; + pthread_t *threadId; + SRWLatch latch; } SDnodeMgmt; typedef struct { @@ -108,7 +108,7 @@ typedef struct SDnode { EStat stat; SDnodeOpt opt; SDnodeDir dir; - SDnodeMgmt d; + SDnodeMgmt dmgmt; SMnodeMgmt m; SVnodesMgmt vmgmt; STransMgmt t; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 378d76e046..962ce2b73d 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -18,28 +18,32 @@ #include "dndTransport.h" #include "dndVnodes.h" -static inline void dndLockDnode(SDnode *pDnd) { pthread_mutex_lock(&pDnd->d.mutex); } +static inline void dndRLockDnode(SDnode *pDnode) { taosRLockLatch(&pDnode->dmgmt.latch); } -static inline void dndUnLockDnode(SDnode *pDnd) { pthread_mutex_unlock(&pDnd->d.mutex); } +static inline void dndRUnLockDnode(SDnode *pDnode) { taosRUnLockLatch(&pDnode->dmgmt.latch); } -int32_t dndGetDnodeId(SDnode *pDnd) { - dndLockDnode(pDnd); - int32_t dnodeId = pDnd->d.dnodeId; - dndUnLockDnode(pDnd); +static inline void dndWLockDnode(SDnode *pDnode) { taosWLockLatch(&pDnode->dmgmt.latch); } + +static inline void dndWUnLockDnode(SDnode *pDnode) { taosWUnLockLatch(&pDnode->dmgmt.latch); } + +int32_t dndGetDnodeId(SDnode *pDnode) { + dndRLockDnode(pDnode); + int32_t dnodeId = pDnode->dmgmt.dnodeId; + dndRUnLockDnode(pDnode); return dnodeId; } -int64_t dndGetClusterId(SDnode *pDnd) { - dndLockDnode(pDnd); - int64_t clusterId = pDnd->d.clusterId; - dndUnLockDnode(pDnd); +int64_t dndGetClusterId(SDnode *pDnode) { + dndRLockDnode(pDnode); + int64_t clusterId = pDnode->dmgmt.clusterId; + dndRUnLockDnode(pDnode); return clusterId; } -void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { - dndLockDnode(pDnd); +void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { + dndRLockDnode(pDnode); - SDnodeEp *pDnodeEp = taosHashGet(pDnd->d.dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *pDnodeEp = taosHashGet(pDnode->dmgmt.dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { if (pPort != NULL) { *pPort = pDnodeEp->port; @@ -52,41 +56,26 @@ void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16 } } - dndUnLockDnode(pDnd); + dndRUnLockDnode(pDnode); } -void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet) { - dndLockDnode(pDnd); - *pEpSet = pDnd->d.peerEpSet; - dndUnLockDnode(pDnd); +void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { + dndRLockDnode(pDnode); + *pEpSet = pDnode->dmgmt.mnodeEpSet; + dndRUnLockDnode(pDnode); } -void dndGetShellEpSet(SDnode *pDnd, SEpSet *pEpSet) { - dndLockDnode(pDnd); - *pEpSet = pDnd->d.shellEpSet; - dndUnLockDnode(pDnd); -} - -void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell) { +void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) { int32_t msgType = pMsg->msgType; SEpSet epSet = {0}; - if (forShell) { - dndGetShellEpSet(pDnd, &epSet); - } else { - dndGetMnodeEpSet(pDnd, &epSet); - } - - dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, taosMsg[msgType], epSet.numOfEps, epSet.inUse); + dndGetMnodeEpSet(pDnode, &epSet); + dDebug("RPC %p, msg:%s is redirected, num:%d inUse:%d", pMsg->handle, taosMsg[msgType], epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]); - if (strcmp(epSet.fqdn[i], pDnd->opt.localFqdn) == 0) { - if ((epSet.port[i] == pDnd->opt.serverPort + TSDB_PORT_DNODEDNODE && !forShell) || - (epSet.port[i] == pDnd->opt.serverPort && forShell)) { - epSet.inUse = (i + 1) % epSet.numOfEps; - dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse); - } + if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) { + epSet.inUse = (i + 1) % epSet.numOfEps; } epSet.port[i] = htons(epSet.port[i]); @@ -96,220 +85,218 @@ void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell) { } static void dndUpdateMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet) { - dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); + dInfo("mnode is changed, num:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse); - dndLockDnode(pDnd); + dndWLockDnode(pDnd); - pDnd->d.peerEpSet = *pEpSet; + pDnd->dmgmt.mnodeEpSet = *pEpSet; for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { - pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); } - pDnd->d.shellEpSet = *pEpSet; - dndUnLockDnode(pDnd); + dndWUnLockDnode(pDnd); } -static void dndPrintDnodes(SDnode *pDnd) { - SDnodeMgmt *pDnode = &pDnd->d; +static void dndPrintDnodes(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; - dDebug("print dnode endpoint list, num:%d", pDnode->dnodeEps->num); - for (int32_t i = 0; i < pDnode->dnodeEps->num; i++) { - SDnodeEp *pEp = &pDnode->dnodeEps->eps[i]; + dDebug("print dnode ep list, num:%d", pMgmt->dnodeEps->num); + for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { + SDnodeEp *pEp = &pMgmt->dnodeEps->eps[i]; dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->fqdn, pEp->port, pEp->isMnode); } } -static void dndResetDnodes(SDnode *pDnd, SDnodeEps *pDnodeEps) { - SDnodeMgmt *pDnode = &pDnd->d; +static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; int32_t size = sizeof(SDnodeEps) + pDnodeEps->num * sizeof(SDnodeEp); - - if (pDnodeEps->num > pDnode->dnodeEps->num) { + if (pDnodeEps->num > pMgmt->dnodeEps->num) { SDnodeEps *tmp = calloc(1, size); if (tmp == NULL) return; - tfree(pDnode->dnodeEps); - pDnode->dnodeEps = tmp; + tfree(pMgmt->dnodeEps); + pMgmt->dnodeEps = tmp; } - if (pDnode->dnodeEps != pDnodeEps) { - memcpy(pDnode->dnodeEps, pDnodeEps, size); + if (pMgmt->dnodeEps != pDnodeEps) { + memcpy(pMgmt->dnodeEps, pDnodeEps, size); } - pDnode->peerEpSet.inUse = 0; - pDnode->shellEpSet.inUse = 0; + pMgmt->mnodeEpSet.inUse = 0; int32_t mIndex = 0; - for (int32_t i = 0; i < pDnode->dnodeEps->num; i++) { - SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; + for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { + SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; if (!pDnodeEp->isMnode) continue; if (mIndex >= TSDB_MAX_REPLICA) continue; - strcpy(pDnode->shellEpSet.fqdn[mIndex], pDnodeEp->fqdn); - strcpy(pDnode->peerEpSet.fqdn[mIndex], pDnodeEp->fqdn); - pDnode->shellEpSet.port[mIndex] = pDnodeEp->port; - pDnode->shellEpSet.port[mIndex] = pDnodeEp->port + TSDB_PORT_DNODEDNODE; + strcpy(pMgmt->mnodeEpSet.fqdn[mIndex], pDnodeEp->fqdn); + pMgmt->mnodeEpSet.port[mIndex] = pDnodeEp->port; mIndex++; } - for (int32_t i = 0; i < pDnode->dnodeEps->num; ++i) { - SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; - taosHashPut(pDnode->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); + for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { + SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; + taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); } - dndPrintDnodes(pDnd); + dndPrintDnodes(pDnode); } -static bool dndIsEpChanged(SDnode *pDnd, int32_t dnodeId) { +static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) { bool changed = false; - dndLockDnode(pDnd); + dndRLockDnode(pDnode); - SDnodeEp *pDnodeEp = taosHashGet(pDnd->d.dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *pDnodeEp = taosHashGet(pDnode->dmgmt.dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { char epstr[TSDB_EP_LEN + 1]; snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port); - changed = strcmp(pDnd->opt.localEp, epstr) != 0; + changed = strcmp(pEp, epstr) != 0; } - dndUnLockDnode(pDnd); + dndRUnLockDnode(pDnode); return changed; } -static int32_t dndReadDnodes(SDnode *pDnd) { - SDnodeMgmt *pDnode = &pDnd->d; +static int32_t dndReadDnodes(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; int32_t len = 0; int32_t maxLen = 30000; char *content = calloc(1, maxLen + 1); cJSON *root = NULL; FILE *fp = NULL; - fp = fopen(pDnode->file, "r"); - if (!fp) { - dDebug("file %s not exist", pDnode->file); + fp = fopen(pMgmt->file, "r"); + if (fp == NULL) { + dDebug("file %s not exist", pMgmt->file); + code = 0; goto PRASE_DNODE_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", pDnode->file); + dError("failed to read %s since content is null", pMgmt->file); goto PRASE_DNODE_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", pDnode->file); + dError("failed to read %s since invalid json format", pMgmt->file); goto PRASE_DNODE_OVER; } cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_String) { - dError("failed to read %s since dnodeId not found", pDnode->file); + dError("failed to read %s since dnodeId not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pDnode->dnodeId = atoi(dnodeId->valuestring); + pMgmt->dnodeId = atoi(dnodeId->valuestring); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { - dError("failed to read %s since clusterId not found", pDnode->file); + dError("failed to read %s since clusterId not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pDnode->clusterId = atoll(clusterId->valuestring); + pMgmt->clusterId = atoll(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_String) { - dError("failed to read %s since dropped not found", pDnode->file); + dError("failed to read %s since dropped not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pDnode->dropped = atoi(dropped->valuestring); + pMgmt->dropped = atoi(dropped->valuestring); cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { - dError("failed to read %s since dnodeInfos not found", pDnode->file); + dError("failed to read %s since dnodeInfos not found", pMgmt->file); goto PRASE_DNODE_OVER; } int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); if (dnodeInfosSize <= 0) { - dError("failed to read %s since dnodeInfos size:%d invalid", pDnode->file, dnodeInfosSize); + dError("failed to read %s since dnodeInfos size:%d invalid", pMgmt->file, dnodeInfosSize); goto PRASE_DNODE_OVER; } - pDnode->dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); - if (pDnode->dnodeEps == NULL) { + pMgmt->dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); + if (pMgmt->dnodeEps == NULL) { dError("failed to calloc dnodeEpList since %s", strerror(errno)); goto PRASE_DNODE_OVER; } - pDnode->dnodeEps->num = dnodeInfosSize; + pMgmt->dnodeEps->num = dnodeInfosSize; for (int32_t i = 0; i < dnodeInfosSize; ++i) { cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); if (dnodeInfo == NULL) break; - SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; + SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_String) { - dError("failed to read %s, dnodeId not found", pDnode->file); + dError("failed to read %s, dnodeId not found", pMgmt->file); goto PRASE_DNODE_OVER; } pDnodeEp->id = atoi(dnodeId->valuestring); cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); if (!isMnode || isMnode->type != cJSON_String) { - dError("failed to read %s, isMnode not found", pDnode->file); + dError("failed to read %s, isMnode not found", pMgmt->file); goto PRASE_DNODE_OVER; } pDnodeEp->isMnode = atoi(isMnode->valuestring); cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { - dError("failed to read %s, dnodeFqdn not found", pDnode->file); + dError("failed to read %s, dnodeFqdn not found", pMgmt->file); goto PRASE_DNODE_OVER; } tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); if (!dnodePort || dnodePort->type != cJSON_String) { - dError("failed to read %s, dnodePort not found", pDnode->file); + dError("failed to read %s, dnodePort not found", pMgmt->file); goto PRASE_DNODE_OVER; } pDnodeEp->port = atoi(dnodePort->valuestring); } - dInfo("succcessed to read file %s", pDnode->file); - dndPrintDnodes(pDnd); + code = 0; + dInfo("succcessed to read file %s", pMgmt->file); + dndPrintDnodes(pDnode); PRASE_DNODE_OVER: if (content != NULL) free(content); if (root != NULL) cJSON_Delete(root); if (fp != NULL) fclose(fp); - if (dndIsEpChanged(pDnd, pDnode->dnodeId)) { - dError("localEp %s different with %s and need reconfigured", pDnd->opt.localEp, pDnode->file); + if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->opt.localEp)) { + dError("localEp %s different with %s and need reconfigured", pDnode->opt.localEp, pMgmt->file); return -1; } - if (pDnode->dnodeEps == NULL) { - pDnode->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); - pDnode->dnodeEps->num = 1; - pDnode->dnodeEps->eps[0].port = pDnd->opt.serverPort; - tstrncpy(pDnode->dnodeEps->eps[0].fqdn, pDnd->opt.localFqdn, TSDB_FQDN_LEN); + if (pMgmt->dnodeEps == NULL) { + pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); + pMgmt->dnodeEps->num = 1; + pMgmt->dnodeEps->eps[0].port = pDnode->opt.serverPort; + tstrncpy(pMgmt->dnodeEps->eps[0].fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); } - dndResetDnodes(pDnd, pDnode->dnodeEps); + dndResetDnodes(pDnode, pMgmt->dnodeEps); terrno = 0; return 0; } -static int32_t dndWriteDnodes(SDnode *pDnd) { - SDnodeMgmt *pDnode = &pDnd->d; +static int32_t dndWriteDnodes(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; - FILE *fp = fopen(pDnode->file, "w"); - if (!fp) { - dError("failed to write %s since %s", pDnode->file, strerror(errno)); + FILE *fp = fopen(pMgmt->file, "w"); + if (fp == NULL) { + dError("failed to write %s since %s", pMgmt->file, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -318,17 +305,17 @@ static int32_t dndWriteDnodes(SDnode *pDnd) { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnode->dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pDnode->clusterId); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pDnode->dropped); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pMgmt->dnodeId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped); len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); - for (int32_t i = 0; i < pDnode->dnodeEps->num; ++i) { - SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; + for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { + SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnodeEp->id); len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", pDnodeEp->isMnode); len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", pDnodeEp->fqdn); len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", pDnodeEp->port); - if (i < pDnode->dnodeEps->num - 1) { + if (i < pMgmt->dnodeEps->num - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { len += snprintf(content + len, maxLen - len, " }]\n"); @@ -342,105 +329,105 @@ static int32_t dndWriteDnodes(SDnode *pDnd) { free(content); terrno = 0; - dInfo("successed to write %s", pDnode->file); + dInfo("successed to write %s", pMgmt->file); return 0; } -static void dndSendStatusMsg(SDnode *pDnd) { - int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); +static void dndSendStatusMsg(SDnode *pDnode) { + int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); + SStatusMsg *pStatus = rpcMallocCont(contLen); if (pStatus == NULL) { dError("failed to malloc status message"); return; } - dndLockDnode(pDnd); - pStatus->sversion = htonl(pDnd->opt.sver); - pStatus->dnodeId = htonl(pDnd->d.dnodeId); - pStatus->clusterId = htobe64(pDnd->d.clusterId); - pStatus->rebootTime = htonl(pDnd->d.rebootTime); - pStatus->numOfCores = htonl(pDnd->opt.numOfCores); - tstrncpy(pStatus->dnodeEp, pDnd->opt.localEp, TSDB_EP_LEN); - pStatus->clusterCfg.statusInterval = htonl(pDnd->opt.statusInterval); - tstrncpy(pStatus->clusterCfg.timezone, pDnd->opt.timezone, TSDB_TIMEZONE_LEN); - tstrncpy(pStatus->clusterCfg.locale, pDnd->opt.locale, TSDB_LOCALE_LEN); - tstrncpy(pStatus->clusterCfg.charset, pDnd->opt.charset, TSDB_LOCALE_LEN); + dndRLockDnode(pDnode); + pStatus->sversion = htonl(pDnode->opt.sver); + pStatus->dnodeId = htonl(pDnode->dmgmt.dnodeId); + pStatus->clusterId = htobe64(pDnode->dmgmt.clusterId); + pStatus->rebootTime = htonl(pDnode->dmgmt.rebootTime); + pStatus->numOfCores = htonl(pDnode->opt.numOfCores); + tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN); + pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval); + tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN); + tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN); + tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN); pStatus->clusterCfg.checkTime = 0; char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - dndUnLockDnode(pDnd); + dndRUnLockDnode(pDnode); - dndGetVnodeLoads(pDnd, &pStatus->vnodeLoads); + dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; - dndSendMsgToMnode(pDnd, &rpcMsg); + dndSendMsgToMnode(pDnode, &rpcMsg); } -static void dndUpdateDnodeCfg(SDnode *pDnd, SDnodeCfg *pCfg) { - SDnodeMgmt *pDnode = &pDnd->d; - if (pDnode->dnodeId != 0 && pDnode->dropped != pCfg->dropped) return; +static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) { + dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); - dndLockDnode(pDnd); - - pDnode->dnodeId = pCfg->dnodeId; - pDnode->clusterId = pCfg->clusterId; - pDnode->dropped = pCfg->dropped; - dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); - - dndWriteDnodes(pDnd); - dndUnLockDnode(pDnd); + dndWLockDnode(pDnode); + pMgmt->dnodeId = pCfg->dnodeId; + pMgmt->clusterId = pCfg->clusterId; + pMgmt->dropped = pCfg->dropped; + (void)dndWriteDnodes(pDnode); + dndWUnLockDnode(pDnode); + } } -static void dndUpdateDnodeEps(SDnode *pDnd, SDnodeEps *pDnodeEps) { +static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) { if (pDnodeEps == NULL || pDnodeEps->num <= 0) return; - dndLockDnode(pDnd); + dndWLockDnode(pDnode); - if (pDnodeEps->num != pDnd->d.dnodeEps->num) { - dndResetDnodes(pDnd, pDnodeEps); - dndWriteDnodes(pDnd); + if (pDnodeEps->num != pDnode->dmgmt.dnodeEps->num) { + dndResetDnodes(pDnode, pDnodeEps); + dndWriteDnodes(pDnode); } else { int32_t size = pDnodeEps->num * sizeof(SDnodeEp) + sizeof(SDnodeEps); - if (memcmp(pDnd->d.dnodeEps, pDnodeEps, size) != 0) { - dndResetDnodes(pDnd, pDnodeEps); - dndWriteDnodes(pDnd); + if (memcmp(pDnode->dmgmt.dnodeEps, pDnodeEps, size) != 0) { + dndResetDnodes(pDnode, pDnodeEps); + dndWriteDnodes(pDnode); } } - dndUnLockDnode(pDnd); + dndWUnLockDnode(pDnode); } -static void dndProcessStatusRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { +static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { if (pEpSet && pEpSet->numOfEps > 0) { - dndUpdateMnodeEpSet(pDnd, pEpSet); + dndUpdateMnodeEpSet(pDnode, pEpSet); } if (pMsg->code != TSDB_CODE_SUCCESS) return; - SStatusRsp *pStatusRsp = pMsg->pCont; - SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; + SStatusRsp *pRsp = pMsg->pCont; + SDnodeCfg *pCfg = &pRsp->dnodeCfg; pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->clusterId = htobe64(pCfg->clusterId); - dndUpdateDnodeCfg(pDnd, pCfg); + dndUpdateDnodeCfg(pDnode, pCfg); if (pCfg->dropped) return; - SDnodeEps *pDnodeEps = &pStatusRsp->dnodeEps; + SDnodeEps *pDnodeEps = &pRsp->dnodeEps; pDnodeEps->num = htonl(pDnodeEps->num); for (int32_t i = 0; i < pDnodeEps->num; ++i) { pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port); } - dndUpdateDnodeEps(pDnd, pDnodeEps); + dndUpdateDnodeEps(pDnode, pDnodeEps); } -static void dndProcessAuthRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } +static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } -static void dndProcessGrantRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } +static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } -static void dndProcessConfigDnodeReq(SDnode *pDnd, SRpcMsg *pMsg) { +static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { SCfgDnodeMsg *pCfg = pMsg->pCont; int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; @@ -449,11 +436,11 @@ static void dndProcessConfigDnodeReq(SDnode *pDnd, SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static void dndProcessStartupReq(SDnode *pDnd, SRpcMsg *pMsg) { +static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { dInfo("startup msg is received"); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); - dndGetStartup(pDnd, pStartup); + dndGetStartup(pDnode, pStartup); dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); @@ -463,52 +450,52 @@ static void dndProcessStartupReq(SDnode *pDnd, SRpcMsg *pMsg) { } static void *dnodeThreadRoutine(void *param) { - SDnode *pDnd = param; - int32_t ms = pDnd->opt.statusInterval * 1000; + SDnode *pDnode = param; + int32_t ms = pDnode->opt.statusInterval * 1000; while (true) { taosMsleep(ms); - if (dndGetStat(pDnd) != DND_STAT_RUNNING) { + if (dndGetStat(pDnode) != DND_STAT_RUNNING) { continue; } pthread_testcancel(); - dndSendStatusMsg(pDnd); + dndSendStatusMsg(pDnode); } } -int32_t dndInitDnode(SDnode *pDnd) { - SDnodeMgmt *pDnode = &pDnd->d; +int32_t dndInitDnode(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; - pDnode->dnodeId = 0; - pDnode->rebootTime = taosGetTimestampSec(); - pDnode->dropped = 0; - pDnode->clusterId = 0; + pMgmt->dnodeId = 0; + pMgmt->rebootTime = taosGetTimestampSec(); + pMgmt->dropped = 0; + pMgmt->clusterId = 0; char path[PATH_MAX]; - snprintf(path, PATH_MAX, "%s/dnode.json", pDnd->dir.dnode); - pDnode->file = strdup(path); - if (pDnode->file == NULL) { + snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode); + pMgmt->file = strdup(path); + if (pMgmt->file == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pDnode->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (pDnode->dnodeHash == NULL) { + pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pMgmt->dnodeHash == NULL) { dError("failed to init dnode hash"); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (dndReadDnodes(pDnd) != 0) { - dError("failed to read file:%s since %s", pDnode->file, terrstr()); + if (dndReadDnodes(pDnode) != 0) { + dError("failed to read file:%s since %s", pMgmt->file, terrstr()); return -1; } - pthread_mutex_init(&pDnode->mutex, NULL); + taosInitRWLatch(&pMgmt->latch); - pDnode->threadId = taosCreateThread(dnodeThreadRoutine, pDnd); - if (pDnode->threadId == NULL) { + pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); + if (pMgmt->threadId == NULL) { dError("failed to init dnode thread"); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -518,44 +505,42 @@ int32_t dndInitDnode(SDnode *pDnd) { return 0; } -void dndCleanupDnode(SDnode *pDnd) { - SDnodeMgmt *pDnode = &pDnd->d; +void dndCleanupDnode(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; - if (pDnode->threadId != NULL) { - taosDestoryThread(pDnode->threadId); - pDnode->threadId = NULL; + if (pMgmt->threadId != NULL) { + taosDestoryThread(pMgmt->threadId); + pMgmt->threadId = NULL; } - dndLockDnode(pDnd); + dndWLockDnode(pDnode); - if (pDnode->dnodeEps != NULL) { - free(pDnode->dnodeEps); - pDnode->dnodeEps = NULL; + if (pMgmt->dnodeEps != NULL) { + free(pMgmt->dnodeEps); + pMgmt->dnodeEps = NULL; } - if (pDnode->dnodeHash != NULL) { - taosHashCleanup(pDnode->dnodeHash); - pDnode->dnodeHash = NULL; + if (pMgmt->dnodeHash != NULL) { + taosHashCleanup(pMgmt->dnodeHash); + pMgmt->dnodeHash = NULL; } - if (pDnode->file != NULL) { - free(pDnode->file); - pDnode->file = NULL; + if (pMgmt->file != NULL) { + free(pMgmt->file); + pMgmt->file = NULL; } - dndUnLockDnode(pDnd); - pthread_mutex_destroy(&pDnode->mutex); - + dndWUnLockDnode(pDnode); dInfo("dnd-dnode is cleaned up"); } -void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { +void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { switch (pMsg->msgType) { case TSDB_MSG_TYPE_NETWORK_TEST: - dndProcessStartupReq(pDnd, pMsg); + dndProcessStartupReq(pDnode, pMsg); break; case TSDB_MSG_TYPE_CONFIG_DNODE_IN: - dndProcessConfigDnodeReq(pDnd, pMsg); + dndProcessConfigDnodeReq(pDnode, pMsg); break; default: dError("RPC %p, dnode req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); @@ -565,16 +550,16 @@ void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { } } -void dndProcessDnodeRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { +void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { switch (pMsg->msgType) { case TSDB_MSG_TYPE_STATUS_RSP: - dndProcessStatusRsp(pDnd, pMsg, pEpSet); + dndProcessStatusRsp(pDnode, pMsg, pEpSet); break; case TSDB_MSG_TYPE_AUTH_RSP: - dndProcessAuthRsp(pDnd, pMsg, pEpSet); + dndProcessAuthRsp(pDnode, pMsg, pEpSet); break; case TSDB_MSG_TYPE_GRANT_RSP: - dndProcessGrantRsp(pDnd, pMsg, pEpSet); + dndProcessGrantRsp(pDnode, pMsg, pEpSet); break; default: dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 5eff8a37ca..bd89476cef 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -53,7 +53,7 @@ void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg) { void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell) { assert(pMnode); - (*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg, forShell); + (*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg); } static int32_t mnodeInitTimer() { From b6c28a4914f2418194b1c79c8a85af0c971f7887 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 26 Nov 2021 11:42:56 +0800 Subject: [PATCH 4/7] update authentications --- source/dnode/mgmt/impl/inc/dndInt.h | 4 +- source/dnode/mgmt/impl/src/dndDnode.c | 8 +-- source/dnode/mgmt/impl/src/dndMnode.c | 76 +++++++++++----------- source/dnode/mgmt/impl/src/dndTransport.c | 78 +++++++++++++++++------ source/dnode/mnode/impl/src/mnodeAuth.c | 11 ---- 5 files changed, 101 insertions(+), 76 deletions(-) diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 258f40aad6..4094871bcd 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -109,9 +109,9 @@ typedef struct SDnode { SDnodeOpt opt; SDnodeDir dir; SDnodeMgmt dmgmt; - SMnodeMgmt m; + SMnodeMgmt mmgmt; SVnodesMgmt vmgmt; - STransMgmt t; + STransMgmt tmgmt; SStartupMsg startup; } SDnode; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 962ce2b73d..fcafaf828f 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -84,17 +84,17 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) { rpcSendRedirectRsp(pMsg->handle, &epSet); } -static void dndUpdateMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet) { +static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { dInfo("mnode is changed, num:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse); - dndWLockDnode(pDnd); + dndWLockDnode(pDnode); - pDnd->dmgmt.mnodeEpSet = *pEpSet; + pDnode->dmgmt.mnodeEpSet = *pEpSet; for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); } - dndWUnLockDnode(pDnd); + dndWUnLockDnode(pDnode); } static void dndPrintDnodes(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 5f3e48d8a1..d581c7761c 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -67,7 +67,7 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); static SMnode *dndAcquireMnode(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = NULL; int32_t refCount = 0; @@ -85,7 +85,7 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) { } static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); @@ -98,7 +98,7 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { } static int32_t dndReadMnodeFile(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; int32_t len = 0; int32_t maxLen = 300; @@ -152,7 +152,7 @@ PRASE_MNODE_OVER: } static int32_t dndWriteMnodeFile(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; char file[PATH_MAX + 20] = {0}; snprintf(file, sizeof(file), "%s.bak", pMgmt->file); @@ -212,7 +212,7 @@ static int32_t dndStartMnodeWorker(SDnode *pDnode) { } static void dndStopMnodeWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosWLockLatch(&pMgmt->latch); pMgmt->deployed = 0; @@ -296,7 +296,7 @@ static int32_t dndBuildMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions, SCr } static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = dndStartMnodeWorker(pDnode); if (code != 0) { @@ -332,7 +332,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) { } static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode == NULL) { @@ -351,7 +351,7 @@ static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions) { } static int32_t dndDropMnode(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode == NULL) { @@ -458,7 +458,7 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { } static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { @@ -472,7 +472,7 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { } static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { @@ -486,7 +486,7 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) { } static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { @@ -500,7 +500,7 @@ static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { } static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { @@ -532,7 +532,7 @@ static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMs } void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); @@ -545,7 +545,7 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { } void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) { SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; @@ -557,7 +557,7 @@ void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { } void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) { SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; @@ -569,7 +569,7 @@ void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { } void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) { SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; @@ -581,7 +581,7 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { } static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode == NULL) { @@ -594,7 +594,7 @@ static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { } static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, NULL, (FProcessItem)dndProcessMnodeMgmtQueue); if (pMgmt->pMgmtQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -604,13 +604,13 @@ static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) { } static void dndFreeMnodeMgmtQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ); pMgmt->pMgmtQ = NULL; } static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SWorkerPool *pPool = &pMgmt->mgmtPool; pPool->name = "mnode-mgmt"; pPool->min = 1; @@ -624,13 +624,13 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { } static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; ; tWorkerCleanup(&pMgmt->mgmtPool); } static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, NULL, (FProcessItem)dndProcessMnodeReadQueue); if (pMgmt->pReadQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -640,13 +640,13 @@ static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { } static void dndFreeMnodeReadQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerFreeQueue(&pMgmt->readPool, pMgmt->pReadQ); pMgmt->pReadQ = NULL; } static int32_t dndInitMnodeReadWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SWorkerPool *pPool = &pMgmt->readPool; pPool->name = "mnode-read"; pPool->min = 0; @@ -660,12 +660,12 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) { } static void dndCleanupMnodeReadWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->readPool); } static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, NULL, (FProcessItem)dndProcessMnodeWriteQueue); if (pMgmt->pWriteQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -675,13 +675,13 @@ static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) { } static void dndFreeMnodeWriteQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pWriteQ); pMgmt->pWriteQ = NULL; } static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->pApplyQ = tWorkerAllocQueue(&pMgmt->writePool, NULL, (FProcessItem)dndProcessMnodeApplyQueue); if (pMgmt->pApplyQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -691,13 +691,13 @@ static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) { } static void dndFreeMnodeApplyQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pApplyQ); pMgmt->pApplyQ = NULL; } static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SWorkerPool *pPool = &pMgmt->writePool; pPool->name = "mnode-write"; pPool->min = 0; @@ -711,12 +711,12 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { } static void dndCleanupMnodeWriteWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->writePool); } static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, NULL, (FProcessItem)dndProcessMnodeSyncQueue); if (pMgmt->pSyncQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -726,13 +726,13 @@ static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) { } static void dndFreeMnodeSyncQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerFreeQueue(&pMgmt->syncPool, pMgmt->pSyncQ); pMgmt->pSyncQ = NULL; } static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SWorkerPool *pPool = &pMgmt->syncPool; pPool->name = "mnode-sync"; pPool->min = 0; @@ -741,13 +741,13 @@ static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) { } static void dndCleanupMnodeSyncWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->syncPool); } int32_t dndInitMnode(SDnode *pDnode) { dInfo("dnode-mnode start to init"); - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosInitRWLatch(&pMgmt->latch); if (dndInitMnodeMgmtWorker(pDnode) != 0) { @@ -791,7 +791,7 @@ int32_t dndInitMnode(SDnode *pDnode) { } void dndCleanupMnode(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; dInfo("dnode-mnode start to clean up"); dndStopMnodeWorker(pDnode); @@ -801,7 +801,7 @@ void dndCleanupMnode(SDnode *pDnode) { } int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - SMnodeMgmt *pMgmt = &pDnode->m; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode == NULL) { diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 679e3ef5f9..d5f52bac8b 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -25,6 +25,10 @@ #include "dndMnode.h" #include "dndVnodes.h" +#define INTERNAL_USER "_internal" +#define INTERNAL_CKEY "_key" +#define INTERNAL_SECRET "_secret" + static void dndInitMsgFp(STransMgmt *pMgmt) { // msg from client to dnode pMgmt->msgFp[TSDB_MSG_TYPE_SUBMIT] = dndProcessVnodeWriteMsg; @@ -121,7 +125,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SDnode *pDnode = parent; - STransMgmt *pMgmt = &pDnode->t; + STransMgmt *pMgmt = &pDnode->tmgmt; int32_t msgType = pMsg->msgType; @@ -143,19 +147,19 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { } static int32_t dndInitClient(SDnode *pDnode) { - STransMgmt *pMgmt = &pDnode->t; + STransMgmt *pMgmt = &pDnode->tmgmt; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.label = "DND-C"; rpcInit.numOfThreads = 1; rpcInit.cfp = dndProcessResponse; - rpcInit.sessions = TSDB_MAX_VNODES << 4; + rpcInit.sessions = 8; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000; - rpcInit.user = "-internal"; - rpcInit.ckey = "-key"; - rpcInit.secret = "-secret"; + rpcInit.user = INTERNAL_USER; + rpcInit.ckey = INTERNAL_CKEY; + rpcInit.secret = INTERNAL_SECRET; pMgmt->clientRpc = rpcOpen(&rpcInit); if (pMgmt->clientRpc == NULL) { @@ -167,7 +171,7 @@ static int32_t dndInitClient(SDnode *pDnode) { } static void dndCleanupClient(SDnode *pDnode) { - STransMgmt *pMgmt = &pDnode->t; + STransMgmt *pMgmt = &pDnode->tmgmt; if (pMgmt->clientRpc) { rpcClose(pMgmt->clientRpc); pMgmt->clientRpc = NULL; @@ -176,8 +180,8 @@ static void dndCleanupClient(SDnode *pDnode) { } static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { - SDnode *pDnode = param; - STransMgmt *pMgmt = &pDnode->t; + SDnode *pDnode = param; + STransMgmt *pMgmt = &pDnode->tmgmt; int32_t msgType = pMsg->msgType; if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { @@ -218,24 +222,56 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { } static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) { - STransMgmt *pMgmt = &pDnode->t; + STransMgmt *pMgmt = &pDnode->tmgmt; SEpSet epSet = {0}; dndGetMnodeEpSet(pDnode, &epSet); rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp); } +static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { + if (strcmp(user, INTERNAL_USER) == 0) { + // A simple temporary implementation + char pass[32] = {0}; + taosEncryptPass((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); + memcpy(secret, pass, TSDB_KEY_LEN); + *spi = 0; + *encrypt = 0; + *ckey = 0; + return 0; + } else if (strcmp(user, TSDB_NETTEST_USER) == 0) { + // A simple temporary implementation + char pass[32] = {0}; + taosEncryptPass((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass); + memcpy(secret, pass, TSDB_KEY_LEN); + *spi = 0; + *encrypt = 0; + *ckey = 0; + return 0; + } else { + return -1; + } +} + static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) { SDnode *pDnode = parent; - if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) != 0) { - if (terrno != TSDB_CODE_APP_NOT_READY) { - dTrace("failed to get user auth from mnode since %s", terrstr()); - return -1; - } + if (dndAuthInternalMsg(parent, user, spi, encrypt, secret, ckey) == 0) { + dTrace("get internal auth success"); + return 0; } - dDebug("user:%s, send auth msg to mnodes", user); + if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) == 0) { + dTrace("get auth from internal mnode"); + return 0; + } + + if (terrno != TSDB_CODE_APP_NOT_READY) { + dTrace("failed to get user auth from internal mnode since %s", terrstr()); + return -1; + } + + dDebug("user:%s, send auth msg to other mnodes", user); SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); tstrncpy(pMsg->user, user, TSDB_USER_LEN); @@ -246,14 +282,14 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char if (rpcRsp.code != 0) { terrno = rpcRsp.code; - dError("user:%s, failed to get user auth from mnodes since %s", user, terrstr()); + dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr()); } else { SAuthRsp *pRsp = rpcRsp.pCont; memcpy(secret, pRsp->secret, TSDB_KEY_LEN); memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN); *spi = pRsp->spi; *encrypt = pRsp->encrypt; - dDebug("user:%s, success to get user auth from mnodes", user); + dDebug("user:%s, success to get user auth from other mnodes", user); } rpcFreeCont(rpcRsp.pCont); @@ -261,7 +297,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char } static int32_t dndInitServer(SDnode *pDnode) { - STransMgmt *pMgmt = &pDnode->t; + STransMgmt *pMgmt = &pDnode->tmgmt; dndInitMsgFp(pMgmt); int32_t numOfThreads = (int32_t)((pDnode->opt.numOfCores * pDnode->opt.numOfThreadsPerCore) / 2.0); @@ -290,7 +326,7 @@ static int32_t dndInitServer(SDnode *pDnode) { } static void dndCleanupServer(SDnode *pDnode) { - STransMgmt *pMgmt = &pDnode->t; + STransMgmt *pMgmt = &pDnode->tmgmt; if (pMgmt->serverRpc) { rpcClose(pMgmt->serverRpc); pMgmt->serverRpc = NULL; @@ -317,7 +353,7 @@ void dndCleanupTrans(SDnode *pDnode) { } void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) { - STransMgmt *pMgmt = &pDnode->t; + STransMgmt *pMgmt = &pDnode->tmgmt; rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL); } diff --git a/source/dnode/mnode/impl/src/mnodeAuth.c b/source/dnode/mnode/impl/src/mnodeAuth.c index ddd2b91ff3..f8e704d16d 100644 --- a/source/dnode/mnode/impl/src/mnodeAuth.c +++ b/source/dnode/mnode/impl/src/mnodeAuth.c @@ -21,16 +21,5 @@ int32_t mnodeInitAuth() { return 0; } void mnodeCleanupAuth() {} int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - if (strcmp(user, TSDB_NETTEST_USER) == 0) { - char pass[32] = {0}; - taosEncryptPass((uint8_t *)user, strlen(user), pass); - *spi = 0; - *encrypt = 0; - *ckey = 0; - memcpy(secret, pass, TSDB_KEY_LEN); - mDebug("nettest user is authorized"); - return 0; - } - return 0; } \ No newline at end of file From 0d57a4beb70e0d1192c1768b4b22d4a02534172b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 26 Nov 2021 14:15:22 +0800 Subject: [PATCH 5/7] TD-11265 invalid write in dnode.c --- include/dnode/mnode/mnode.h | 8 ++++---- include/dnode/vnode/vnode.h | 8 ++++---- source/dnode/mgmt/daemon/src/daemon.c | 12 ++++++------ source/dnode/mgmt/impl/inc/dndDnode.h | 18 +++++++++--------- source/dnode/mgmt/impl/src/dndDnode.c | 2 +- source/dnode/mgmt/impl/src/dnode.c | 8 ++++---- source/dnode/mnode/impl/inc/mnodeDef.h | 4 ++-- 7 files changed, 30 insertions(+), 30 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index b7b05f896f..725bdaec3c 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -24,10 +24,10 @@ extern "C" { typedef struct SDnode SDnode; typedef struct SMnode SMnode; typedef struct SMnodeMsg SMnodeMsg; -typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); -typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnd, SMnodeMsg *pMsg); +typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg); typedef struct SMnodeLoad { int64_t numOfDnode; diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 8dc01b2a8a..3f6705fac6 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -185,10 +185,10 @@ typedef struct { } SVnodeMsg; typedef struct SDnode SDnode; -typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); -typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnd, int32_t vgId, SVnodeMsg *pMsg); +typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg); typedef struct { PutMsgToVnodeQFp putMsgToApplyQueueFp; diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index 4d1116466c..429b097fb8 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -141,13 +141,13 @@ void dmnInitOption(SDnodeOpt *pOption) { pOption->shellActivityTimer = tsShellActivityTimer; pOption->statusInterval = tsStatusInterval; pOption->serverPort = tsServerPort; - tstrncpy(pOption->dataDir, tsDataDir, TSDB_EP_LEN); + tstrncpy(pOption->dataDir, tsDataDir, PATH_MAX); tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN); - tstrncpy(pOption->localFqdn, tsLocalEp, TSDB_FQDN_LEN); - tstrncpy(pOption->firstEp, tsFirst, TSDB_FQDN_LEN); - tstrncpy(pOption->timezone, tsLocalEp, TSDB_TIMEZONE_LEN); - tstrncpy(pOption->locale, tsLocalEp, TSDB_LOCALE_LEN); - tstrncpy(pOption->charset, tsLocalEp, TSDB_LOCALE_LEN); + tstrncpy(pOption->localFqdn, tsLocalFqdn, TSDB_FQDN_LEN); + tstrncpy(pOption->firstEp, tsFirst, TSDB_EP_LEN); + tstrncpy(pOption->timezone, tsTimezone, TSDB_TIMEZONE_LEN); + tstrncpy(pOption->locale, tsLocale, TSDB_LOCALE_LEN); + tstrncpy(pOption->charset, tsCharset, TSDB_LOCALE_LEN); } int dmnRunDnode() { diff --git a/source/dnode/mgmt/impl/inc/dndDnode.h b/source/dnode/mgmt/impl/inc/dndDnode.h index 590a9611e1..4bb4cad8cc 100644 --- a/source/dnode/mgmt/impl/inc/dndDnode.h +++ b/source/dnode/mgmt/impl/inc/dndDnode.h @@ -21,16 +21,16 @@ extern "C" { #endif #include "dndInt.h" -int32_t dndInitDnode(SDnode *pDnd); -void dndCleanupDnode(SDnode *pDnd); -void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessDnodeRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet); +int32_t dndInitDnode(SDnode *pDnode); +void dndCleanupDnode(SDnode *pDnode); +void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t dndGetDnodeId(SDnode *pDnd); -int64_t dndGetClusterId(SDnode *pDnd); -void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); -void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet); -void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg); +int32_t dndGetDnodeId(SDnode *pDnode); +int64_t dndGetClusterId(SDnode *pDnode); +void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); +void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); +void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index fcafaf828f..ef30503494 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -480,7 +480,7 @@ int32_t dndInitDnode(SDnode *pDnode) { return -1; } - pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (pMgmt->dnodeHash == NULL) { dError("failed to init dnode hash"); terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index 1261136fd3..0bfcf5d721 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -84,13 +84,13 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) { char path[PATH_MAX + 100]; snprintf(path, sizeof(path), "%s%smnode", pOptions->dataDir, TD_DIRSEP); - pDnode->dir.mnode = strdup(path); + pDnode->dir.mnode = tstrdup(path); snprintf(path, sizeof(path), "%s%svnode", pOptions->dataDir, TD_DIRSEP); - pDnode->dir.vnodes = strdup(path); + pDnode->dir.vnodes = tstrdup(path); snprintf(path, sizeof(path), "%s%sdnode", pOptions->dataDir, TD_DIRSEP); - pDnode->dir.dnode = strdup(path); + pDnode->dir.dnode = tstrdup(path); if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) { dError("failed to malloc dir object"); @@ -140,7 +140,7 @@ SDnode *dndInit(SDnodeOpt *pOptions) { taosBlockSIGPIPE(); taosResolveCRC(); - SDnode *pDnode = calloc(1, sizeof(pDnode)); + SDnode *pDnode = calloc(1, sizeof(SDnode)); if (pDnode == NULL) { dError("failed to create dnode object"); terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mnode/impl/inc/mnodeDef.h b/source/dnode/mnode/impl/inc/mnodeDef.h index 4b4c4abdb3..ccdba13006 100644 --- a/source/dnode/mnode/impl/inc/mnodeDef.h +++ b/source/dnode/mnode/impl/inc/mnodeDef.h @@ -131,7 +131,7 @@ typedef struct SMnodeObj { int64_t roleTime; int64_t createdTime; int64_t updateTime; - SDnodeObj *pDnd; + SDnodeObj *pDnode; } SMnodeObj; typedef struct { @@ -215,7 +215,7 @@ typedef struct SDbObj { typedef struct { int32_t dnodeId; int8_t role; - SDnodeObj *pDnd; + SDnodeObj *pDnode; } SVnodeGid; typedef struct SVgObj { From 7906e78257565977d4a5ac1f7a81ebfe267dd657 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 26 Nov 2021 14:43:56 +0800 Subject: [PATCH 6/7] TD-11265 refact dndVnodes --- include/dnode/mgmt/dnode.h | 2 +- include/dnode/vnode/vnode.h | 18 ----------- source/dnode/mgmt/daemon/src/daemon.c | 2 +- source/dnode/mgmt/impl/src/dndDnode.c | 13 ++++---- source/dnode/mgmt/impl/src/dndMnode.c | 6 ++-- source/dnode/mgmt/impl/src/dndVnodes.c | 44 +++++++++++++++++--------- source/dnode/mgmt/impl/src/dnode.c | 1 + source/dnode/vnode/impl/src/vnodeInt.c | 7 ++-- 8 files changed, 43 insertions(+), 50 deletions(-) diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index 7dd7730443..f43fe107fe 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -78,7 +78,7 @@ typedef struct { * @brief data file's directory. * */ - char dataDir[PATH_MAX]; + char dataDir[TSDB_FILENAME_LEN]; /** * @brief local endpoint. diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 3f6705fac6..1edd93f509 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -184,27 +184,9 @@ typedef struct { SRpcMsg rpcMsg[]; } SVnodeMsg; -typedef struct SDnode SDnode; -typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg); - -typedef struct { - PutMsgToVnodeQFp putMsgToApplyQueueFp; - SendMsgToDnodeFp sendMsgToDnodeFp; - SendMsgToMnodeFp sendMsgToMnodeFp; -} SVnodePara; - -int32_t vnodeInit(SVnodePara); -void vnodeCleanup(); - int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); -SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg); -void vnodeDrop(SVnode *pVnode); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); - int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); SVnodeMsg *vnodeInitMsg(int32_t msgNum); diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index 429b097fb8..effaec66a8 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -141,7 +141,7 @@ void dmnInitOption(SDnodeOpt *pOption) { pOption->shellActivityTimer = tsShellActivityTimer; pOption->statusInterval = tsStatusInterval; pOption->serverPort = tsServerPort; - tstrncpy(pOption->dataDir, tsDataDir, PATH_MAX); + tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN); tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN); tstrncpy(pOption->localFqdn, tsLocalFqdn, TSDB_FQDN_LEN); tstrncpy(pOption->firstEp, tsFirst, TSDB_EP_LEN); diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index ef30503494..fa601a0d99 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -455,12 +455,11 @@ static void *dnodeThreadRoutine(void *param) { while (true) { taosMsleep(ms); - if (dndGetStat(pDnode) != DND_STAT_RUNNING) { - continue; - } - pthread_testcancel(); - dndSendStatusMsg(pDnode); + + if (dndGetStat(pDnode) == DND_STAT_RUNNING) { + dndSendStatusMsg(pDnode); + } } } @@ -501,7 +500,7 @@ int32_t dndInitDnode(SDnode *pDnode) { return -1; } - dInfo("dnd-dnode is initialized"); + dInfo("dnode-dnode is initialized"); return 0; } @@ -531,7 +530,7 @@ void dndCleanupDnode(SDnode *pDnode) { } dndWUnLockDnode(pDnode); - dInfo("dnd-dnode is cleaned up"); + dInfo("dnode-dnode is cleaned up"); } void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index d581c7761c..9b3435a49e 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -616,7 +616,7 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { pPool->min = 1; pPool->max = 1; if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -652,7 +652,7 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) { pPool->min = 0; pPool->max = 1; if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -703,7 +703,7 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { pPool->min = 0; pPool->max = 1; if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index d5e94106a7..ac3e55ffa7 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -22,6 +22,7 @@ typedef struct { int32_t refCount; int8_t dropped; int8_t accessState; + char *path; SVnode *pImpl; taos_queue pWriteQ; taos_queue pSyncQ; @@ -74,7 +75,7 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnode static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl); +static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl); static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode); static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes); static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes); @@ -125,7 +126,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { } } -static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl) { +static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { @@ -139,6 +140,12 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->pImpl = pImpl; + pVnode->path = tstrdup(path); + if (pVnode->path == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (dndAllocVnodeQueryQueue(pDnode, pVnode) != 0) { return -1; } @@ -354,22 +361,25 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg) { char path[PATH_MAX + 20] = {0}; snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, vgId); - SVnode *pImpl = vnodeCreate(vgId, path, pCfg); + // SVnode *pImpl = vnodeCreate(vgId, path, pCfg); + SVnode *pImpl = vnodeOpen(path, NULL); if (pImpl == NULL) { return -1; } - int32_t code = dndCreateVnodeWrapper(pDnode, vgId, pImpl); + int32_t code = dndCreateVnodeWrapper(pDnode, vgId, path, pImpl); if (code != 0) { - vnodeDrop(pImpl); + vnodeClose(pImpl); + vnodeDestroy(path); terrno = code; return code; } code = dndWriteVnodesToFile(pDnode); if (code != 0) { - vnodeDrop(pImpl); + vnodeClose(pImpl); + vnodeDestroy(path); terrno = code; return code; } @@ -385,7 +395,8 @@ static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) { } dndDropVnodeWrapper(pDnode, pVnode); - vnodeDrop(pVnode->pImpl); + vnodeClose(pVnode->pImpl); + vnodeDestroy(pVnode->path); dndWriteVnodesToFile(pDnode); return 0; } @@ -413,7 +424,7 @@ static void *dnodeOpenVnodeFunc(void *param) { dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex); pThread->failed++; } else { - dndCreateVnodeWrapper(pDnode, pVnode->vgId, pImpl); + dndCreateVnodeWrapper(pDnode, pVnode->vgId, path, pImpl); dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex); pThread->opened++; } @@ -433,7 +444,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (pMgmt->hash == NULL) { dError("failed to init vnode hash"); - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -874,13 +885,13 @@ static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) { pPool->min = 1; pPool->max = 1; if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } pMgmt->pMgmtQ = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)dndProcessVnodeMgmtQueue); if (pMgmt->pMgmtQ == NULL) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -918,6 +929,7 @@ static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + return 0; } @@ -938,7 +950,8 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) { pPool->min = (int32_t)threadsForQuery; pPool->max = pPool->min; if (tWorkerInit(pPool) != 0) { - return TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } pPool = &pMgmt->fetchPool; @@ -946,7 +959,8 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) { pPool->min = MIN(maxFetchThreads, pDnode->opt.numOfCores); pPool->max = pPool->min; if (tWorkerInit(pPool) != 0) { - TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } return 0; @@ -998,7 +1012,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { pPool->name = "vnode-write"; pPool->max = tsNumOfCores; if (tMWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -1036,7 +1050,7 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { pPool->name = "vnode-sync"; pPool->max = maxThreads; if (tMWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index 0bfcf5d721..aa0070cfa9 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -116,6 +116,7 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) { return -1; } + memcpy(&pDnode->opt, pOptions, sizeof(SDnodeOpt)); return 0; } diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 2cbdf318a2..8a6fc8bf5e 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -17,9 +17,6 @@ #include "vnodeInt.h" #include "tqueue.h" -int32_t vnodeInit(SVnodePara para) { return 0; } -void vnodeCleanup() {} - int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; } void vnodeDrop(SVnode *pVnode) {} @@ -31,7 +28,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; } SVnodeMsg *vnodeInitMsg(int32_t msgNum) { SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg)); if (pMsg == NULL) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } else { pMsg->allocNum = msgNum; @@ -41,7 +38,7 @@ SVnodeMsg *vnodeInitMsg(int32_t msgNum) { int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) { if (pMsg->curNum >= pMsg->allocNum) { - return TSDB_CODE_VND_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg; From aad478d34f218bbd45ec11e4d900dde505b9aad3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 26 Nov 2021 16:28:38 +0800 Subject: [PATCH 7/7] TD-11265 refact dndMnode --- include/dnode/mgmt/dnode.h | 4 +- include/dnode/mnode/mnode.h | 10 +-- source/dnode/mgmt/impl/src/dndMnode.c | 107 ++++++++++++++----------- source/dnode/mgmt/impl/src/dndVnodes.c | 2 +- source/dnode/mgmt/impl/src/dnode.c | 16 ++-- source/dnode/mnode/impl/inc/mnodeInt.h | 2 +- source/dnode/mnode/impl/src/mnode.c | 28 +++---- 7 files changed, 91 insertions(+), 78 deletions(-) diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index f43fe107fe..fe9560d427 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -121,10 +121,10 @@ typedef struct { /** * @brief Initialize and start the dnode. * - * @param pOptions Options of the dnode. + * @param pOption Option of the dnode. * @return SDnode* The dnode object. */ -SDnode *dndInit(SDnodeOpt *pOptions); +SDnode *dndInit(SDnodeOpt *pOption); /** * @brief Stop and cleanup the dnode. diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 725bdaec3c..824eb24191 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -53,17 +53,17 @@ typedef struct { SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp; SendRedirectMsgFp sendRedirectMsgFp; -} SMnodeOptions; +} SMnodeOpt; /* ------------------------ SMnode ------------------------ */ /** * @brief Open a mnode. * * @param path Path of the mnode - * @param pOptions Options of the mnode + * @param pOption Option of the mnode * @return SMnode* The mnode object */ -SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions); +SMnode *mnodeOpen(const char *path, const SMnodeOpt *pOption); /** * @brief Close a mnode @@ -76,10 +76,10 @@ void mnodeClose(SMnode *pMnode); * @brief Close a mnode * * @param pMnode The mnode object to close - * @param pOptions Options of the mnode + * @param pOption Options of the mnode * @return int32_t 0 for success, -1 for failure */ -int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions); +int32_t mnodeAlter(SMnode *pMnode, const SMnodeOpt *pOption); /** * @brief Drop a mnode. diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 9b3435a49e..0a764af8dc 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -58,8 +58,8 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode); static int32_t dndReadMnodeFile(SDnode *pDnode); static int32_t dndWriteMnodeFile(SDnode *pDnode); -static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions); -static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions); +static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption); +static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption); static int32_t dndDropMnode(SDnode *pDnode); static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); @@ -243,6 +243,7 @@ static bool dndNeedDeployMnode(SDnode *pDnode) { if (dndGetClusterId(pDnode) > 0) { return false; } + if (strcmp(pDnode->opt.localEp, pDnode->opt.firstEp) != 0) { return false; } @@ -250,43 +251,49 @@ static bool dndNeedDeployMnode(SDnode *pDnode) { return true; } -static void dndInitMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions) { - pOptions->pDnode = pDnode; - pOptions->sendMsgToDnodeFp = dndSendMsgToDnode; - pOptions->sendMsgToMnodeFp = dndSendMsgToMnode; - pOptions->sendRedirectMsgFp = dndSendRedirectMsg; - pOptions->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue; +static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { + pOption->pDnode = pDnode; + pOption->sendMsgToDnodeFp = dndSendMsgToDnode; + pOption->sendMsgToMnodeFp = dndSendMsgToMnode; + pOption->sendRedirectMsgFp = dndSendRedirectMsg; + pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue; + pOption->dnodeId = dndGetDnodeId(pDnode); + pOption->clusterId = dndGetClusterId(pDnode); } -static int32_t dndBuildMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions, SCreateMnodeMsg *pMsg) { - dndInitMnodeOptions(pDnode, pOptions); +static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { + dndInitMnodeOption(pDnode, pOption); + pOption->replica = 1; + pOption->selfIndex = 0; + SReplica *pReplica = &pOption->replicas[0]; + pReplica->id = 1; + pReplica->port = pDnode->opt.serverPort; + tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); +} - if (pMsg == NULL) { - pOptions->dnodeId = 1; - pOptions->clusterId = 1234; - pOptions->replica = 1; - pOptions->selfIndex = 0; - SReplica *pReplica = &pOptions->replicas[0]; - pReplica->id = 1; - pReplica->port = pDnode->opt.serverPort; - tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); - } else { - pOptions->dnodeId = dndGetDnodeId(pDnode); - pOptions->clusterId = dndGetClusterId(pDnode); - pOptions->selfIndex = -1; - pOptions->replica = pMsg->replica; - for (int32_t index = 0; index < pMsg->replica; ++index) { - SReplica *pReplica = &pOptions->replicas[index]; - pReplica->id = pMsg->replicas[index].id; - pReplica->port = pMsg->replicas[index].port; - tstrncpy(pReplica->fqdn, pMsg->replicas[index].fqdn, TSDB_FQDN_LEN); - if (pReplica->id == pOptions->dnodeId) { - pOptions->selfIndex = index; - } +static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) { + dndInitMnodeOption(pDnode, pOption); + pOption->replica = 0; +} + +static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeMsg *pMsg) { + dndInitMnodeOption(pDnode, pOption); + pOption->dnodeId = dndGetDnodeId(pDnode); + pOption->clusterId = dndGetClusterId(pDnode); + + pOption->replica = pMsg->replica; + pOption->selfIndex = -1; + for (int32_t index = 0; index < pMsg->replica; ++index) { + SReplica *pReplica = &pOption->replicas[index]; + pReplica->id = pMsg->replicas[index].id; + pReplica->port = pMsg->replicas[index].port; + tstrncpy(pReplica->fqdn, pMsg->replicas[index].fqdn, TSDB_FQDN_LEN); + if (pReplica->id == pOption->dnodeId) { + pOption->selfIndex = index; } } - if (pOptions->selfIndex == -1) { + if (pOption->selfIndex == -1) { terrno = TSDB_CODE_DND_MNODE_ID_NOT_FOUND; dError("failed to build mnode options since %s", terrstr()); return -1; @@ -295,7 +302,7 @@ static int32_t dndBuildMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions, SCr return 0; } -static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) { +static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = dndStartMnodeWorker(pDnode); @@ -304,7 +311,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) { return code; } - SMnode *pMnode = mnodeOpen(pDnode->dir.mnode, pOptions); + SMnode *pMnode = mnodeOpen(pDnode->dir.mnode, pOption); if (pMnode == NULL) { dError("failed to open mnode since %s", terrstr()); code = terrno; @@ -331,7 +338,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) { return 0; } -static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions) { +static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); @@ -340,7 +347,7 @@ static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions) { return -1; } - if (mnodeAlter(pMnode, pOptions) != 0) { + if (mnodeAlter(pMnode, pOption) != 0) { dError("failed to alter mnode since %s", terrstr()); dndReleaseMnode(pDnode, pMnode); return -1; @@ -399,8 +406,8 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; return -1; } else { - SMnodeOptions option = {0}; - if (dndBuildMnodeOptions(pDnode, &option, pMsg) != 0) { + SMnodeOpt option = {0}; + if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) { return -1; } return dndOpenMnode(pDnode, &option); @@ -414,8 +421,8 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; return -1; } else { - SMnodeOptions option = {0}; - if (dndBuildMnodeOptions(pDnode, &option, pMsg) != 0) { + SMnodeOpt option = {0}; + if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) { return -1; } return dndAlterMnode(pDnode, &option); @@ -625,7 +632,6 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - ; tWorkerCleanup(&pMgmt->mgmtPool); } @@ -737,7 +743,12 @@ static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) { pPool->name = "mnode-sync"; pPool->min = 0; pPool->max = 1; - return tWorkerInit(pPool); + if (tWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return 0; } static void dndCleanupMnodeSyncWorker(SDnode *pDnode) { @@ -781,13 +792,15 @@ int32_t dndInitMnode(SDnode *pDnode) { } dInfo("start to deploy mnode"); + SMnodeOpt option = {0}; + dndBuildMnodeDeployOption(pDnode, &option); + return dndOpenMnode(pDnode, &option); } else { dInfo("start to open mnode"); + SMnodeOpt option = {0}; + dndBuildMnodeOpenOption(pDnode, &option); + return dndOpenMnode(pDnode, &option); } - - SMnodeOptions option = {0}; - dndInitMnodeOptions(pDnode, &option); - return dndOpenMnode(pDnode, &option); } void dndCleanupMnode(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index ac3e55ffa7..fd66695e32 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -239,7 +239,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_ snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes); fp = fopen(file, "r"); - if (!fp) { + if (fp == NULL) { dDebug("file %s not exist", file); code = 0; goto PRASE_VNODE_OVER; diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index aa0070cfa9..8d72f83200 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -77,19 +77,19 @@ static int32_t dndCheckRunning(char *dataDir) { return 0; } -static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) { - if (dndCheckRunning(pOptions->dataDir) != 0) { +static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) { + if (dndCheckRunning(pOption->dataDir) != 0) { return -1; } char path[PATH_MAX + 100]; - snprintf(path, sizeof(path), "%s%smnode", pOptions->dataDir, TD_DIRSEP); + snprintf(path, sizeof(path), "%s%smnode", pOption->dataDir, TD_DIRSEP); pDnode->dir.mnode = tstrdup(path); - snprintf(path, sizeof(path), "%s%svnode", pOptions->dataDir, TD_DIRSEP); + snprintf(path, sizeof(path), "%s%svnode", pOption->dataDir, TD_DIRSEP); pDnode->dir.vnodes = tstrdup(path); - snprintf(path, sizeof(path), "%s%sdnode", pOptions->dataDir, TD_DIRSEP); + snprintf(path, sizeof(path), "%s%sdnode", pOption->dataDir, TD_DIRSEP); pDnode->dir.dnode = tstrdup(path); if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) { @@ -116,7 +116,7 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) { return -1; } - memcpy(&pDnode->opt, pOptions, sizeof(SDnodeOpt)); + memcpy(&pDnode->opt, pOption, sizeof(SDnodeOpt)); return 0; } @@ -136,7 +136,7 @@ static void dndCleanupEnv(SDnode *pDnode) { taosStopCacheRefreshWorker(); } -SDnode *dndInit(SDnodeOpt *pOptions) { +SDnode *dndInit(SDnodeOpt *pOption) { taosIgnSIGPIPE(); taosBlockSIGPIPE(); taosResolveCRC(); @@ -151,7 +151,7 @@ SDnode *dndInit(SDnodeOpt *pOptions) { dInfo("start to initialize TDengine"); dndSetStat(pDnode, DND_STAT_INIT); - if (dndInitEnv(pDnode, pOptions) != 0) { + if (dndInitEnv(pDnode, pOption) != 0) { dError("failed to init env"); dndCleanup(pDnode); return NULL; diff --git a/source/dnode/mnode/impl/inc/mnodeInt.h b/source/dnode/mnode/impl/inc/mnodeInt.h index 43af281f27..7f7f91a3af 100644 --- a/source/dnode/mnode/impl/inc/mnodeInt.h +++ b/source/dnode/mnode/impl/inc/mnodeInt.h @@ -32,7 +32,7 @@ typedef struct SMnodeBak { tmr_h timer; SSteps *pInitSteps; SSteps *pStartSteps; - SMnodeOptions para; + SMnodeOpt para; MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX]; } SMnodeBak; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index bd89476cef..43dd57bbf8 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -77,17 +77,17 @@ static void mnodeCleanupTimer() { tmr_h mnodeGetTimer() { return tsMint.timer; } -static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOptions *pOptions) { - pMnode->dnodeId = pOptions->dnodeId; - pMnode->clusterId = pOptions->clusterId; - pMnode->replica = pOptions->replica; - pMnode->selfIndex = pOptions->selfIndex; - memcpy(&pMnode->replicas, pOptions->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); - pMnode->pServer = pOptions->pDnode; - pMnode->putMsgToApplyMsgFp = pOptions->putMsgToApplyMsgFp; - pMnode->sendMsgToDnodeFp = pOptions->sendMsgToDnodeFp; - pMnode->sendMsgToMnodeFp = pOptions->sendMsgToMnodeFp; - pMnode->sendRedirectMsgFp = pOptions->sendRedirectMsgFp; +static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { + pMnode->dnodeId = pOption->dnodeId; + pMnode->clusterId = pOption->clusterId; + pMnode->replica = pOption->replica; + pMnode->selfIndex = pOption->selfIndex; + memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); + pMnode->pServer = pOption->pDnode; + pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp; + pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; + pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; + pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { @@ -136,10 +136,10 @@ static int32_t mnodeAllocStartSteps() { return 0; } -SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) { +SMnode *mnodeOpen(const char *path, const SMnodeOpt *pOption) { SMnode *pMnode = calloc(1, sizeof(SMnode)); - if (mnodeSetOptions(pMnode, pOptions) != 0) { + if (mnodeSetOptions(pMnode, pOption) != 0) { free(pMnode); mError("failed to init mnode options since %s", terrstr()); return NULL; @@ -173,7 +173,7 @@ SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) { void mnodeClose(SMnode *pMnode) { free(pMnode); } -int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions) { return 0; } +int32_t mnodeAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; } void mnodeDestroy(const char *path) { sdbUnDeploy(); }