From 04ea23584f789bd5650b8f6dc845828616dd1269 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 26 Nov 2021 11:13:45 +0800 Subject: [PATCH] refact dnode --- include/common/tglobal.h | 4 - include/dnode/mnode/mnode.h | 2 +- include/dnode/vnode/vnode.h | 2 +- include/util/tdef.h | 6 - source/common/src/tglobal.c | 9 - source/dnode/mgmt/impl/inc/dndDnode.h | 2 +- source/dnode/mgmt/impl/inc/dndInt.h | 26 +- source/dnode/mgmt/impl/src/dndDnode.c | 411 +++++++++++++------------- source/dnode/mnode/impl/src/mnode.c | 2 +- 9 files changed, 215 insertions(+), 249 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index f3fce8becd..f478e96766 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -28,10 +28,6 @@ extern char tsSecond[]; extern char tsLocalFqdn[]; extern char tsLocalEp[]; extern uint16_t tsServerPort; -extern uint16_t tsDnodeShellPort; -extern uint16_t tsDnodeDnodePort; -extern uint16_t tsSyncPort; -extern uint16_t tsArbitratorPort; extern int32_t tsStatusInterval; extern int32_t tsNumOfMnodes; extern int8_t tsEnableVnodeBak; diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 98aefc6db3..b7b05f896f 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -26,7 +26,7 @@ typedef struct SMnode SMnode; typedef struct SMnodeMsg SMnodeMsg; typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell); +typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnd, SMnodeMsg *pMsg); typedef struct SMnodeLoad { diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 586cb49d0f..8dc01b2a8a 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -187,7 +187,7 @@ typedef struct { typedef struct SDnode SDnode; typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell); +typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnd, int32_t vgId, SVnodeMsg *pMsg); typedef struct { diff --git a/include/util/tdef.h b/include/util/tdef.h index d61a3e7188..76df8887a0 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -358,12 +358,6 @@ do { \ #define TSDB_DEFAULT_STABLES_HASH_SIZE 100 #define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000 -#define TSDB_PORT_DNODESHELL 0 -#define TSDB_PORT_DNODEDNODE 5 -#define TSDB_PORT_SYNC 10 -#define TSDB_PORT_HTTP 11 -#define TSDB_PORT_ARBITRATOR 12 - #define TSDB_MAX_WAL_SIZE (1024*1024*3) #define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 501936d354..8d57218a64 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -33,10 +33,6 @@ char tsArbitrator[TSDB_EP_LEN] = {0}; char tsLocalFqdn[TSDB_FQDN_LEN] = {0}; char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port uint16_t tsServerPort = 6030; -uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035] -uint16_t tsDnodeDnodePort = 6035; // udp/tcp -uint16_t tsSyncPort = 6040; -uint16_t tsArbitratorPort = 6042; int32_t tsStatusInterval = 1; // second int32_t tsNumOfMnodes = 1; int8_t tsEnableVnodeBak = 1; @@ -1726,11 +1722,6 @@ int32_t taosCheckGlobalCfg() { } } - tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035] - tsDnodeDnodePort = tsServerPort + TSDB_PORT_DNODEDNODE; // udp/tcp - tsSyncPort = tsServerPort + TSDB_PORT_SYNC; - tsHttpPort = tsServerPort + TSDB_PORT_HTTP; - if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } diff --git a/source/dnode/mgmt/impl/inc/dndDnode.h b/source/dnode/mgmt/impl/inc/dndDnode.h index ef16b1c8f0..590a9611e1 100644 --- a/source/dnode/mgmt/impl/inc/dndDnode.h +++ b/source/dnode/mgmt/impl/inc/dndDnode.h @@ -30,7 +30,7 @@ int32_t dndGetDnodeId(SDnode *pDnd); int64_t dndGetClusterId(SDnode *pDnd); void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet); -void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell); +void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index fba589c73a..258f40aad6 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -31,9 +31,10 @@ extern "C" { #include "tthread.h" #include "ttime.h" #include "tworker.h" + +#include "dnode.h" #include "mnode.h" #include "vnode.h" -#include "dnode.h" extern int32_t dDebugFlag; @@ -54,17 +55,16 @@ typedef struct { } SDnodeDir; typedef struct { - int32_t dnodeId; - uint32_t rebootTime; - int32_t dropped; - int64_t clusterId; - SEpSet shellEpSet; - SEpSet peerEpSet; - char *file; - SHashObj *dnodeHash; - SDnodeEps *dnodeEps; - pthread_t *threadId; - pthread_mutex_t mutex; + int32_t dnodeId; + int32_t dropped; + uint32_t rebootTime; + int64_t clusterId; + SEpSet mnodeEpSet; + char *file; + SHashObj *dnodeHash; + SDnodeEps *dnodeEps; + pthread_t *threadId; + SRWLatch latch; } SDnodeMgmt; typedef struct { @@ -108,7 +108,7 @@ typedef struct SDnode { EStat stat; SDnodeOpt opt; SDnodeDir dir; - SDnodeMgmt d; + SDnodeMgmt dmgmt; SMnodeMgmt m; SVnodesMgmt vmgmt; STransMgmt t; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 378d76e046..962ce2b73d 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -18,28 +18,32 @@ #include "dndTransport.h" #include "dndVnodes.h" -static inline void dndLockDnode(SDnode *pDnd) { pthread_mutex_lock(&pDnd->d.mutex); } +static inline void dndRLockDnode(SDnode *pDnode) { taosRLockLatch(&pDnode->dmgmt.latch); } -static inline void dndUnLockDnode(SDnode *pDnd) { pthread_mutex_unlock(&pDnd->d.mutex); } +static inline void dndRUnLockDnode(SDnode *pDnode) { taosRUnLockLatch(&pDnode->dmgmt.latch); } -int32_t dndGetDnodeId(SDnode *pDnd) { - dndLockDnode(pDnd); - int32_t dnodeId = pDnd->d.dnodeId; - dndUnLockDnode(pDnd); +static inline void dndWLockDnode(SDnode *pDnode) { taosWLockLatch(&pDnode->dmgmt.latch); } + +static inline void dndWUnLockDnode(SDnode *pDnode) { taosWUnLockLatch(&pDnode->dmgmt.latch); } + +int32_t dndGetDnodeId(SDnode *pDnode) { + dndRLockDnode(pDnode); + int32_t dnodeId = pDnode->dmgmt.dnodeId; + dndRUnLockDnode(pDnode); return dnodeId; } -int64_t dndGetClusterId(SDnode *pDnd) { - dndLockDnode(pDnd); - int64_t clusterId = pDnd->d.clusterId; - dndUnLockDnode(pDnd); +int64_t dndGetClusterId(SDnode *pDnode) { + dndRLockDnode(pDnode); + int64_t clusterId = pDnode->dmgmt.clusterId; + dndRUnLockDnode(pDnode); return clusterId; } -void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { - dndLockDnode(pDnd); +void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { + dndRLockDnode(pDnode); - SDnodeEp *pDnodeEp = taosHashGet(pDnd->d.dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *pDnodeEp = taosHashGet(pDnode->dmgmt.dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { if (pPort != NULL) { *pPort = pDnodeEp->port; @@ -52,41 +56,26 @@ void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16 } } - dndUnLockDnode(pDnd); + dndRUnLockDnode(pDnode); } -void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet) { - dndLockDnode(pDnd); - *pEpSet = pDnd->d.peerEpSet; - dndUnLockDnode(pDnd); +void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { + dndRLockDnode(pDnode); + *pEpSet = pDnode->dmgmt.mnodeEpSet; + dndRUnLockDnode(pDnode); } -void dndGetShellEpSet(SDnode *pDnd, SEpSet *pEpSet) { - dndLockDnode(pDnd); - *pEpSet = pDnd->d.shellEpSet; - dndUnLockDnode(pDnd); -} - -void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell) { +void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) { int32_t msgType = pMsg->msgType; SEpSet epSet = {0}; - if (forShell) { - dndGetShellEpSet(pDnd, &epSet); - } else { - dndGetMnodeEpSet(pDnd, &epSet); - } - - dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, taosMsg[msgType], epSet.numOfEps, epSet.inUse); + dndGetMnodeEpSet(pDnode, &epSet); + dDebug("RPC %p, msg:%s is redirected, num:%d inUse:%d", pMsg->handle, taosMsg[msgType], epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]); - if (strcmp(epSet.fqdn[i], pDnd->opt.localFqdn) == 0) { - if ((epSet.port[i] == pDnd->opt.serverPort + TSDB_PORT_DNODEDNODE && !forShell) || - (epSet.port[i] == pDnd->opt.serverPort && forShell)) { - epSet.inUse = (i + 1) % epSet.numOfEps; - dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse); - } + if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) { + epSet.inUse = (i + 1) % epSet.numOfEps; } epSet.port[i] = htons(epSet.port[i]); @@ -96,220 +85,218 @@ void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell) { } static void dndUpdateMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet) { - dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); + dInfo("mnode is changed, num:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse); - dndLockDnode(pDnd); + dndWLockDnode(pDnd); - pDnd->d.peerEpSet = *pEpSet; + pDnd->dmgmt.mnodeEpSet = *pEpSet; for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { - pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); } - pDnd->d.shellEpSet = *pEpSet; - dndUnLockDnode(pDnd); + dndWUnLockDnode(pDnd); } -static void dndPrintDnodes(SDnode *pDnd) { - SDnodeMgmt *pDnode = &pDnd->d; +static void dndPrintDnodes(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; - dDebug("print dnode endpoint list, num:%d", pDnode->dnodeEps->num); - for (int32_t i = 0; i < pDnode->dnodeEps->num; i++) { - SDnodeEp *pEp = &pDnode->dnodeEps->eps[i]; + dDebug("print dnode ep list, num:%d", pMgmt->dnodeEps->num); + for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { + SDnodeEp *pEp = &pMgmt->dnodeEps->eps[i]; dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->fqdn, pEp->port, pEp->isMnode); } } -static void dndResetDnodes(SDnode *pDnd, SDnodeEps *pDnodeEps) { - SDnodeMgmt *pDnode = &pDnd->d; +static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; int32_t size = sizeof(SDnodeEps) + pDnodeEps->num * sizeof(SDnodeEp); - - if (pDnodeEps->num > pDnode->dnodeEps->num) { + if (pDnodeEps->num > pMgmt->dnodeEps->num) { SDnodeEps *tmp = calloc(1, size); if (tmp == NULL) return; - tfree(pDnode->dnodeEps); - pDnode->dnodeEps = tmp; + tfree(pMgmt->dnodeEps); + pMgmt->dnodeEps = tmp; } - if (pDnode->dnodeEps != pDnodeEps) { - memcpy(pDnode->dnodeEps, pDnodeEps, size); + if (pMgmt->dnodeEps != pDnodeEps) { + memcpy(pMgmt->dnodeEps, pDnodeEps, size); } - pDnode->peerEpSet.inUse = 0; - pDnode->shellEpSet.inUse = 0; + pMgmt->mnodeEpSet.inUse = 0; int32_t mIndex = 0; - for (int32_t i = 0; i < pDnode->dnodeEps->num; i++) { - SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; + for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { + SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; if (!pDnodeEp->isMnode) continue; if (mIndex >= TSDB_MAX_REPLICA) continue; - strcpy(pDnode->shellEpSet.fqdn[mIndex], pDnodeEp->fqdn); - strcpy(pDnode->peerEpSet.fqdn[mIndex], pDnodeEp->fqdn); - pDnode->shellEpSet.port[mIndex] = pDnodeEp->port; - pDnode->shellEpSet.port[mIndex] = pDnodeEp->port + TSDB_PORT_DNODEDNODE; + strcpy(pMgmt->mnodeEpSet.fqdn[mIndex], pDnodeEp->fqdn); + pMgmt->mnodeEpSet.port[mIndex] = pDnodeEp->port; mIndex++; } - for (int32_t i = 0; i < pDnode->dnodeEps->num; ++i) { - SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; - taosHashPut(pDnode->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); + for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { + SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; + taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); } - dndPrintDnodes(pDnd); + dndPrintDnodes(pDnode); } -static bool dndIsEpChanged(SDnode *pDnd, int32_t dnodeId) { +static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) { bool changed = false; - dndLockDnode(pDnd); + dndRLockDnode(pDnode); - SDnodeEp *pDnodeEp = taosHashGet(pDnd->d.dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *pDnodeEp = taosHashGet(pDnode->dmgmt.dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { char epstr[TSDB_EP_LEN + 1]; snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port); - changed = strcmp(pDnd->opt.localEp, epstr) != 0; + changed = strcmp(pEp, epstr) != 0; } - dndUnLockDnode(pDnd); + dndRUnLockDnode(pDnode); return changed; } -static int32_t dndReadDnodes(SDnode *pDnd) { - SDnodeMgmt *pDnode = &pDnd->d; +static int32_t dndReadDnodes(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; int32_t len = 0; int32_t maxLen = 30000; char *content = calloc(1, maxLen + 1); cJSON *root = NULL; FILE *fp = NULL; - fp = fopen(pDnode->file, "r"); - if (!fp) { - dDebug("file %s not exist", pDnode->file); + fp = fopen(pMgmt->file, "r"); + if (fp == NULL) { + dDebug("file %s not exist", pMgmt->file); + code = 0; goto PRASE_DNODE_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", pDnode->file); + dError("failed to read %s since content is null", pMgmt->file); goto PRASE_DNODE_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", pDnode->file); + dError("failed to read %s since invalid json format", pMgmt->file); goto PRASE_DNODE_OVER; } cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_String) { - dError("failed to read %s since dnodeId not found", pDnode->file); + dError("failed to read %s since dnodeId not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pDnode->dnodeId = atoi(dnodeId->valuestring); + pMgmt->dnodeId = atoi(dnodeId->valuestring); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { - dError("failed to read %s since clusterId not found", pDnode->file); + dError("failed to read %s since clusterId not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pDnode->clusterId = atoll(clusterId->valuestring); + pMgmt->clusterId = atoll(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_String) { - dError("failed to read %s since dropped not found", pDnode->file); + dError("failed to read %s since dropped not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pDnode->dropped = atoi(dropped->valuestring); + pMgmt->dropped = atoi(dropped->valuestring); cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { - dError("failed to read %s since dnodeInfos not found", pDnode->file); + dError("failed to read %s since dnodeInfos not found", pMgmt->file); goto PRASE_DNODE_OVER; } int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); if (dnodeInfosSize <= 0) { - dError("failed to read %s since dnodeInfos size:%d invalid", pDnode->file, dnodeInfosSize); + dError("failed to read %s since dnodeInfos size:%d invalid", pMgmt->file, dnodeInfosSize); goto PRASE_DNODE_OVER; } - pDnode->dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); - if (pDnode->dnodeEps == NULL) { + pMgmt->dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); + if (pMgmt->dnodeEps == NULL) { dError("failed to calloc dnodeEpList since %s", strerror(errno)); goto PRASE_DNODE_OVER; } - pDnode->dnodeEps->num = dnodeInfosSize; + pMgmt->dnodeEps->num = dnodeInfosSize; for (int32_t i = 0; i < dnodeInfosSize; ++i) { cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); if (dnodeInfo == NULL) break; - SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; + SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_String) { - dError("failed to read %s, dnodeId not found", pDnode->file); + dError("failed to read %s, dnodeId not found", pMgmt->file); goto PRASE_DNODE_OVER; } pDnodeEp->id = atoi(dnodeId->valuestring); cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); if (!isMnode || isMnode->type != cJSON_String) { - dError("failed to read %s, isMnode not found", pDnode->file); + dError("failed to read %s, isMnode not found", pMgmt->file); goto PRASE_DNODE_OVER; } pDnodeEp->isMnode = atoi(isMnode->valuestring); cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { - dError("failed to read %s, dnodeFqdn not found", pDnode->file); + dError("failed to read %s, dnodeFqdn not found", pMgmt->file); goto PRASE_DNODE_OVER; } tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); if (!dnodePort || dnodePort->type != cJSON_String) { - dError("failed to read %s, dnodePort not found", pDnode->file); + dError("failed to read %s, dnodePort not found", pMgmt->file); goto PRASE_DNODE_OVER; } pDnodeEp->port = atoi(dnodePort->valuestring); } - dInfo("succcessed to read file %s", pDnode->file); - dndPrintDnodes(pDnd); + code = 0; + dInfo("succcessed to read file %s", pMgmt->file); + dndPrintDnodes(pDnode); PRASE_DNODE_OVER: if (content != NULL) free(content); if (root != NULL) cJSON_Delete(root); if (fp != NULL) fclose(fp); - if (dndIsEpChanged(pDnd, pDnode->dnodeId)) { - dError("localEp %s different with %s and need reconfigured", pDnd->opt.localEp, pDnode->file); + if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->opt.localEp)) { + dError("localEp %s different with %s and need reconfigured", pDnode->opt.localEp, pMgmt->file); return -1; } - if (pDnode->dnodeEps == NULL) { - pDnode->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); - pDnode->dnodeEps->num = 1; - pDnode->dnodeEps->eps[0].port = pDnd->opt.serverPort; - tstrncpy(pDnode->dnodeEps->eps[0].fqdn, pDnd->opt.localFqdn, TSDB_FQDN_LEN); + if (pMgmt->dnodeEps == NULL) { + pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); + pMgmt->dnodeEps->num = 1; + pMgmt->dnodeEps->eps[0].port = pDnode->opt.serverPort; + tstrncpy(pMgmt->dnodeEps->eps[0].fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); } - dndResetDnodes(pDnd, pDnode->dnodeEps); + dndResetDnodes(pDnode, pMgmt->dnodeEps); terrno = 0; return 0; } -static int32_t dndWriteDnodes(SDnode *pDnd) { - SDnodeMgmt *pDnode = &pDnd->d; +static int32_t dndWriteDnodes(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; - FILE *fp = fopen(pDnode->file, "w"); - if (!fp) { - dError("failed to write %s since %s", pDnode->file, strerror(errno)); + FILE *fp = fopen(pMgmt->file, "w"); + if (fp == NULL) { + dError("failed to write %s since %s", pMgmt->file, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -318,17 +305,17 @@ static int32_t dndWriteDnodes(SDnode *pDnd) { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnode->dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pDnode->clusterId); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pDnode->dropped); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pMgmt->dnodeId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped); len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); - for (int32_t i = 0; i < pDnode->dnodeEps->num; ++i) { - SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; + for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { + SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnodeEp->id); len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", pDnodeEp->isMnode); len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", pDnodeEp->fqdn); len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", pDnodeEp->port); - if (i < pDnode->dnodeEps->num - 1) { + if (i < pMgmt->dnodeEps->num - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { len += snprintf(content + len, maxLen - len, " }]\n"); @@ -342,105 +329,105 @@ static int32_t dndWriteDnodes(SDnode *pDnd) { free(content); terrno = 0; - dInfo("successed to write %s", pDnode->file); + dInfo("successed to write %s", pMgmt->file); return 0; } -static void dndSendStatusMsg(SDnode *pDnd) { - int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); +static void dndSendStatusMsg(SDnode *pDnode) { + int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); + SStatusMsg *pStatus = rpcMallocCont(contLen); if (pStatus == NULL) { dError("failed to malloc status message"); return; } - dndLockDnode(pDnd); - pStatus->sversion = htonl(pDnd->opt.sver); - pStatus->dnodeId = htonl(pDnd->d.dnodeId); - pStatus->clusterId = htobe64(pDnd->d.clusterId); - pStatus->rebootTime = htonl(pDnd->d.rebootTime); - pStatus->numOfCores = htonl(pDnd->opt.numOfCores); - tstrncpy(pStatus->dnodeEp, pDnd->opt.localEp, TSDB_EP_LEN); - pStatus->clusterCfg.statusInterval = htonl(pDnd->opt.statusInterval); - tstrncpy(pStatus->clusterCfg.timezone, pDnd->opt.timezone, TSDB_TIMEZONE_LEN); - tstrncpy(pStatus->clusterCfg.locale, pDnd->opt.locale, TSDB_LOCALE_LEN); - tstrncpy(pStatus->clusterCfg.charset, pDnd->opt.charset, TSDB_LOCALE_LEN); + dndRLockDnode(pDnode); + pStatus->sversion = htonl(pDnode->opt.sver); + pStatus->dnodeId = htonl(pDnode->dmgmt.dnodeId); + pStatus->clusterId = htobe64(pDnode->dmgmt.clusterId); + pStatus->rebootTime = htonl(pDnode->dmgmt.rebootTime); + pStatus->numOfCores = htonl(pDnode->opt.numOfCores); + tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN); + pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval); + tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN); + tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN); + tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN); pStatus->clusterCfg.checkTime = 0; char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - dndUnLockDnode(pDnd); + dndRUnLockDnode(pDnode); - dndGetVnodeLoads(pDnd, &pStatus->vnodeLoads); + dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; - dndSendMsgToMnode(pDnd, &rpcMsg); + dndSendMsgToMnode(pDnode, &rpcMsg); } -static void dndUpdateDnodeCfg(SDnode *pDnd, SDnodeCfg *pCfg) { - SDnodeMgmt *pDnode = &pDnd->d; - if (pDnode->dnodeId != 0 && pDnode->dropped != pCfg->dropped) return; +static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) { + dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); - dndLockDnode(pDnd); - - pDnode->dnodeId = pCfg->dnodeId; - pDnode->clusterId = pCfg->clusterId; - pDnode->dropped = pCfg->dropped; - dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); - - dndWriteDnodes(pDnd); - dndUnLockDnode(pDnd); + dndWLockDnode(pDnode); + pMgmt->dnodeId = pCfg->dnodeId; + pMgmt->clusterId = pCfg->clusterId; + pMgmt->dropped = pCfg->dropped; + (void)dndWriteDnodes(pDnode); + dndWUnLockDnode(pDnode); + } } -static void dndUpdateDnodeEps(SDnode *pDnd, SDnodeEps *pDnodeEps) { +static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) { if (pDnodeEps == NULL || pDnodeEps->num <= 0) return; - dndLockDnode(pDnd); + dndWLockDnode(pDnode); - if (pDnodeEps->num != pDnd->d.dnodeEps->num) { - dndResetDnodes(pDnd, pDnodeEps); - dndWriteDnodes(pDnd); + if (pDnodeEps->num != pDnode->dmgmt.dnodeEps->num) { + dndResetDnodes(pDnode, pDnodeEps); + dndWriteDnodes(pDnode); } else { int32_t size = pDnodeEps->num * sizeof(SDnodeEp) + sizeof(SDnodeEps); - if (memcmp(pDnd->d.dnodeEps, pDnodeEps, size) != 0) { - dndResetDnodes(pDnd, pDnodeEps); - dndWriteDnodes(pDnd); + if (memcmp(pDnode->dmgmt.dnodeEps, pDnodeEps, size) != 0) { + dndResetDnodes(pDnode, pDnodeEps); + dndWriteDnodes(pDnode); } } - dndUnLockDnode(pDnd); + dndWUnLockDnode(pDnode); } -static void dndProcessStatusRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { +static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { if (pEpSet && pEpSet->numOfEps > 0) { - dndUpdateMnodeEpSet(pDnd, pEpSet); + dndUpdateMnodeEpSet(pDnode, pEpSet); } if (pMsg->code != TSDB_CODE_SUCCESS) return; - SStatusRsp *pStatusRsp = pMsg->pCont; - SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; + SStatusRsp *pRsp = pMsg->pCont; + SDnodeCfg *pCfg = &pRsp->dnodeCfg; pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->clusterId = htobe64(pCfg->clusterId); - dndUpdateDnodeCfg(pDnd, pCfg); + dndUpdateDnodeCfg(pDnode, pCfg); if (pCfg->dropped) return; - SDnodeEps *pDnodeEps = &pStatusRsp->dnodeEps; + SDnodeEps *pDnodeEps = &pRsp->dnodeEps; pDnodeEps->num = htonl(pDnodeEps->num); for (int32_t i = 0; i < pDnodeEps->num; ++i) { pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port); } - dndUpdateDnodeEps(pDnd, pDnodeEps); + dndUpdateDnodeEps(pDnode, pDnodeEps); } -static void dndProcessAuthRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } +static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } -static void dndProcessGrantRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } +static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } -static void dndProcessConfigDnodeReq(SDnode *pDnd, SRpcMsg *pMsg) { +static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { SCfgDnodeMsg *pCfg = pMsg->pCont; int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; @@ -449,11 +436,11 @@ static void dndProcessConfigDnodeReq(SDnode *pDnd, SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static void dndProcessStartupReq(SDnode *pDnd, SRpcMsg *pMsg) { +static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { dInfo("startup msg is received"); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); - dndGetStartup(pDnd, pStartup); + dndGetStartup(pDnode, pStartup); dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); @@ -463,52 +450,52 @@ static void dndProcessStartupReq(SDnode *pDnd, SRpcMsg *pMsg) { } static void *dnodeThreadRoutine(void *param) { - SDnode *pDnd = param; - int32_t ms = pDnd->opt.statusInterval * 1000; + SDnode *pDnode = param; + int32_t ms = pDnode->opt.statusInterval * 1000; while (true) { taosMsleep(ms); - if (dndGetStat(pDnd) != DND_STAT_RUNNING) { + if (dndGetStat(pDnode) != DND_STAT_RUNNING) { continue; } pthread_testcancel(); - dndSendStatusMsg(pDnd); + dndSendStatusMsg(pDnode); } } -int32_t dndInitDnode(SDnode *pDnd) { - SDnodeMgmt *pDnode = &pDnd->d; +int32_t dndInitDnode(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; - pDnode->dnodeId = 0; - pDnode->rebootTime = taosGetTimestampSec(); - pDnode->dropped = 0; - pDnode->clusterId = 0; + pMgmt->dnodeId = 0; + pMgmt->rebootTime = taosGetTimestampSec(); + pMgmt->dropped = 0; + pMgmt->clusterId = 0; char path[PATH_MAX]; - snprintf(path, PATH_MAX, "%s/dnode.json", pDnd->dir.dnode); - pDnode->file = strdup(path); - if (pDnode->file == NULL) { + snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode); + pMgmt->file = strdup(path); + if (pMgmt->file == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pDnode->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (pDnode->dnodeHash == NULL) { + pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pMgmt->dnodeHash == NULL) { dError("failed to init dnode hash"); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (dndReadDnodes(pDnd) != 0) { - dError("failed to read file:%s since %s", pDnode->file, terrstr()); + if (dndReadDnodes(pDnode) != 0) { + dError("failed to read file:%s since %s", pMgmt->file, terrstr()); return -1; } - pthread_mutex_init(&pDnode->mutex, NULL); + taosInitRWLatch(&pMgmt->latch); - pDnode->threadId = taosCreateThread(dnodeThreadRoutine, pDnd); - if (pDnode->threadId == NULL) { + pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); + if (pMgmt->threadId == NULL) { dError("failed to init dnode thread"); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -518,44 +505,42 @@ int32_t dndInitDnode(SDnode *pDnd) { return 0; } -void dndCleanupDnode(SDnode *pDnd) { - SDnodeMgmt *pDnode = &pDnd->d; +void dndCleanupDnode(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; - if (pDnode->threadId != NULL) { - taosDestoryThread(pDnode->threadId); - pDnode->threadId = NULL; + if (pMgmt->threadId != NULL) { + taosDestoryThread(pMgmt->threadId); + pMgmt->threadId = NULL; } - dndLockDnode(pDnd); + dndWLockDnode(pDnode); - if (pDnode->dnodeEps != NULL) { - free(pDnode->dnodeEps); - pDnode->dnodeEps = NULL; + if (pMgmt->dnodeEps != NULL) { + free(pMgmt->dnodeEps); + pMgmt->dnodeEps = NULL; } - if (pDnode->dnodeHash != NULL) { - taosHashCleanup(pDnode->dnodeHash); - pDnode->dnodeHash = NULL; + if (pMgmt->dnodeHash != NULL) { + taosHashCleanup(pMgmt->dnodeHash); + pMgmt->dnodeHash = NULL; } - if (pDnode->file != NULL) { - free(pDnode->file); - pDnode->file = NULL; + if (pMgmt->file != NULL) { + free(pMgmt->file); + pMgmt->file = NULL; } - dndUnLockDnode(pDnd); - pthread_mutex_destroy(&pDnode->mutex); - + dndWUnLockDnode(pDnode); dInfo("dnd-dnode is cleaned up"); } -void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { +void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { switch (pMsg->msgType) { case TSDB_MSG_TYPE_NETWORK_TEST: - dndProcessStartupReq(pDnd, pMsg); + dndProcessStartupReq(pDnode, pMsg); break; case TSDB_MSG_TYPE_CONFIG_DNODE_IN: - dndProcessConfigDnodeReq(pDnd, pMsg); + dndProcessConfigDnodeReq(pDnode, pMsg); break; default: dError("RPC %p, dnode req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); @@ -565,16 +550,16 @@ void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { } } -void dndProcessDnodeRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { +void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { switch (pMsg->msgType) { case TSDB_MSG_TYPE_STATUS_RSP: - dndProcessStatusRsp(pDnd, pMsg, pEpSet); + dndProcessStatusRsp(pDnode, pMsg, pEpSet); break; case TSDB_MSG_TYPE_AUTH_RSP: - dndProcessAuthRsp(pDnd, pMsg, pEpSet); + dndProcessAuthRsp(pDnode, pMsg, pEpSet); break; case TSDB_MSG_TYPE_GRANT_RSP: - dndProcessGrantRsp(pDnd, pMsg, pEpSet); + dndProcessGrantRsp(pDnode, pMsg, pEpSet); break; default: dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 5eff8a37ca..bd89476cef 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -53,7 +53,7 @@ void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg) { void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell) { assert(pMnode); - (*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg, forShell); + (*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg); } static int32_t mnodeInitTimer() {