refact dnode

This commit is contained in:
Shengliang Guan 2021-11-26 11:13:45 +08:00
parent 19865f3b33
commit 04ea23584f
9 changed files with 215 additions and 249 deletions

View File

@ -28,10 +28,6 @@ extern char tsSecond[];
extern char tsLocalFqdn[]; extern char tsLocalFqdn[];
extern char tsLocalEp[]; extern char tsLocalEp[];
extern uint16_t tsServerPort; extern uint16_t tsServerPort;
extern uint16_t tsDnodeShellPort;
extern uint16_t tsDnodeDnodePort;
extern uint16_t tsSyncPort;
extern uint16_t tsArbitratorPort;
extern int32_t tsStatusInterval; extern int32_t tsStatusInterval;
extern int32_t tsNumOfMnodes; extern int32_t tsNumOfMnodes;
extern int8_t tsEnableVnodeBak; extern int8_t tsEnableVnodeBak;

View File

@ -26,7 +26,7 @@ typedef struct SMnode SMnode;
typedef struct SMnodeMsg SMnodeMsg; typedef struct SMnodeMsg SMnodeMsg;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell); typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnd, SMnodeMsg *pMsg); typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnd, SMnodeMsg *pMsg);
typedef struct SMnodeLoad { typedef struct SMnodeLoad {

View File

@ -187,7 +187,7 @@ typedef struct {
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell); typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnd, int32_t vgId, SVnodeMsg *pMsg); typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnd, int32_t vgId, SVnodeMsg *pMsg);
typedef struct { typedef struct {

View File

@ -358,12 +358,6 @@ do { \
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100 #define TSDB_DEFAULT_STABLES_HASH_SIZE 100
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000 #define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
#define TSDB_PORT_DNODESHELL 0
#define TSDB_PORT_DNODEDNODE 5
#define TSDB_PORT_SYNC 10
#define TSDB_PORT_HTTP 11
#define TSDB_PORT_ARBITRATOR 12
#define TSDB_MAX_WAL_SIZE (1024*1024*3) #define TSDB_MAX_WAL_SIZE (1024*1024*3)
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P #define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P

View File

@ -33,10 +33,6 @@ char tsArbitrator[TSDB_EP_LEN] = {0};
char tsLocalFqdn[TSDB_FQDN_LEN] = {0}; char tsLocalFqdn[TSDB_FQDN_LEN] = {0};
char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port
uint16_t tsServerPort = 6030; uint16_t tsServerPort = 6030;
uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035]
uint16_t tsDnodeDnodePort = 6035; // udp/tcp
uint16_t tsSyncPort = 6040;
uint16_t tsArbitratorPort = 6042;
int32_t tsStatusInterval = 1; // second int32_t tsStatusInterval = 1; // second
int32_t tsNumOfMnodes = 1; int32_t tsNumOfMnodes = 1;
int8_t tsEnableVnodeBak = 1; int8_t tsEnableVnodeBak = 1;
@ -1726,11 +1722,6 @@ int32_t taosCheckGlobalCfg() {
} }
} }
tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035]
tsDnodeDnodePort = tsServerPort + TSDB_PORT_DNODEDNODE; // udp/tcp
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
tsHttpPort = tsServerPort + TSDB_PORT_HTTP;
if (tsQueryBufferSize >= 0) { if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
} }

View File

@ -30,7 +30,7 @@ int32_t dndGetDnodeId(SDnode *pDnd);
int64_t dndGetClusterId(SDnode *pDnd); int64_t dndGetClusterId(SDnode *pDnd);
void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet); void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell); void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -31,9 +31,10 @@ extern "C" {
#include "tthread.h" #include "tthread.h"
#include "ttime.h" #include "ttime.h"
#include "tworker.h" #include "tworker.h"
#include "dnode.h"
#include "mnode.h" #include "mnode.h"
#include "vnode.h" #include "vnode.h"
#include "dnode.h"
extern int32_t dDebugFlag; extern int32_t dDebugFlag;
@ -55,16 +56,15 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
uint32_t rebootTime;
int32_t dropped; int32_t dropped;
uint32_t rebootTime;
int64_t clusterId; int64_t clusterId;
SEpSet shellEpSet; SEpSet mnodeEpSet;
SEpSet peerEpSet;
char *file; char *file;
SHashObj *dnodeHash; SHashObj *dnodeHash;
SDnodeEps *dnodeEps; SDnodeEps *dnodeEps;
pthread_t *threadId; pthread_t *threadId;
pthread_mutex_t mutex; SRWLatch latch;
} SDnodeMgmt; } SDnodeMgmt;
typedef struct { typedef struct {
@ -108,7 +108,7 @@ typedef struct SDnode {
EStat stat; EStat stat;
SDnodeOpt opt; SDnodeOpt opt;
SDnodeDir dir; SDnodeDir dir;
SDnodeMgmt d; SDnodeMgmt dmgmt;
SMnodeMgmt m; SMnodeMgmt m;
SVnodesMgmt vmgmt; SVnodesMgmt vmgmt;
STransMgmt t; STransMgmt t;

View File

@ -18,28 +18,32 @@
#include "dndTransport.h" #include "dndTransport.h"
#include "dndVnodes.h" #include "dndVnodes.h"
static inline void dndLockDnode(SDnode *pDnd) { pthread_mutex_lock(&pDnd->d.mutex); } static inline void dndRLockDnode(SDnode *pDnode) { taosRLockLatch(&pDnode->dmgmt.latch); }
static inline void dndUnLockDnode(SDnode *pDnd) { pthread_mutex_unlock(&pDnd->d.mutex); } static inline void dndRUnLockDnode(SDnode *pDnode) { taosRUnLockLatch(&pDnode->dmgmt.latch); }
int32_t dndGetDnodeId(SDnode *pDnd) { static inline void dndWLockDnode(SDnode *pDnode) { taosWLockLatch(&pDnode->dmgmt.latch); }
dndLockDnode(pDnd);
int32_t dnodeId = pDnd->d.dnodeId; static inline void dndWUnLockDnode(SDnode *pDnode) { taosWUnLockLatch(&pDnode->dmgmt.latch); }
dndUnLockDnode(pDnd);
int32_t dndGetDnodeId(SDnode *pDnode) {
dndRLockDnode(pDnode);
int32_t dnodeId = pDnode->dmgmt.dnodeId;
dndRUnLockDnode(pDnode);
return dnodeId; return dnodeId;
} }
int64_t dndGetClusterId(SDnode *pDnd) { int64_t dndGetClusterId(SDnode *pDnode) {
dndLockDnode(pDnd); dndRLockDnode(pDnode);
int64_t clusterId = pDnd->d.clusterId; int64_t clusterId = pDnode->dmgmt.clusterId;
dndUnLockDnode(pDnd); dndRUnLockDnode(pDnode);
return clusterId; return clusterId;
} }
void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
dndLockDnode(pDnd); dndRLockDnode(pDnode);
SDnodeEp *pDnodeEp = taosHashGet(pDnd->d.dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *pDnodeEp = taosHashGet(pDnode->dmgmt.dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) { if (pDnodeEp != NULL) {
if (pPort != NULL) { if (pPort != NULL) {
*pPort = pDnodeEp->port; *pPort = pDnodeEp->port;
@ -52,41 +56,26 @@ void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16
} }
} }
dndUnLockDnode(pDnd); dndRUnLockDnode(pDnode);
} }
void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet) { void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
dndLockDnode(pDnd); dndRLockDnode(pDnode);
*pEpSet = pDnd->d.peerEpSet; *pEpSet = pDnode->dmgmt.mnodeEpSet;
dndUnLockDnode(pDnd); dndRUnLockDnode(pDnode);
} }
void dndGetShellEpSet(SDnode *pDnd, SEpSet *pEpSet) { void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) {
dndLockDnode(pDnd);
*pEpSet = pDnd->d.shellEpSet;
dndUnLockDnode(pDnd);
}
void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell) {
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
SEpSet epSet = {0}; SEpSet epSet = {0};
if (forShell) { dndGetMnodeEpSet(pDnode, &epSet);
dndGetShellEpSet(pDnd, &epSet);
} else {
dndGetMnodeEpSet(pDnd, &epSet);
}
dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, taosMsg[msgType], epSet.numOfEps, epSet.inUse);
dDebug("RPC %p, msg:%s is redirected, num:%d inUse:%d", pMsg->handle, taosMsg[msgType], epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]); dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
if (strcmp(epSet.fqdn[i], pDnd->opt.localFqdn) == 0) { if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) {
if ((epSet.port[i] == pDnd->opt.serverPort + TSDB_PORT_DNODEDNODE && !forShell) ||
(epSet.port[i] == pDnd->opt.serverPort && forShell)) {
epSet.inUse = (i + 1) % epSet.numOfEps; epSet.inUse = (i + 1) % epSet.numOfEps;
dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse);
}
} }
epSet.port[i] = htons(epSet.port[i]); epSet.port[i] = htons(epSet.port[i]);
@ -96,220 +85,218 @@ void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell) {
} }
static void dndUpdateMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet) { static void dndUpdateMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); dInfo("mnode is changed, num:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
dndLockDnode(pDnd); dndWLockDnode(pDnd);
pDnd->d.peerEpSet = *pEpSet; pDnd->dmgmt.mnodeEpSet = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
} }
pDnd->d.shellEpSet = *pEpSet;
dndUnLockDnode(pDnd); dndWUnLockDnode(pDnd);
} }
static void dndPrintDnodes(SDnode *pDnd) { static void dndPrintDnodes(SDnode *pDnode) {
SDnodeMgmt *pDnode = &pDnd->d; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dDebug("print dnode endpoint list, num:%d", pDnode->dnodeEps->num); dDebug("print dnode ep list, num:%d", pMgmt->dnodeEps->num);
for (int32_t i = 0; i < pDnode->dnodeEps->num; i++) { for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) {
SDnodeEp *pEp = &pDnode->dnodeEps->eps[i]; SDnodeEp *pEp = &pMgmt->dnodeEps->eps[i];
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->fqdn, pEp->port, pEp->isMnode); dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->fqdn, pEp->port, pEp->isMnode);
} }
} }
static void dndResetDnodes(SDnode *pDnd, SDnodeEps *pDnodeEps) { static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) {
SDnodeMgmt *pDnode = &pDnd->d; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
int32_t size = sizeof(SDnodeEps) + pDnodeEps->num * sizeof(SDnodeEp); int32_t size = sizeof(SDnodeEps) + pDnodeEps->num * sizeof(SDnodeEp);
if (pDnodeEps->num > pMgmt->dnodeEps->num) {
if (pDnodeEps->num > pDnode->dnodeEps->num) {
SDnodeEps *tmp = calloc(1, size); SDnodeEps *tmp = calloc(1, size);
if (tmp == NULL) return; if (tmp == NULL) return;
tfree(pDnode->dnodeEps); tfree(pMgmt->dnodeEps);
pDnode->dnodeEps = tmp; pMgmt->dnodeEps = tmp;
} }
if (pDnode->dnodeEps != pDnodeEps) { if (pMgmt->dnodeEps != pDnodeEps) {
memcpy(pDnode->dnodeEps, pDnodeEps, size); memcpy(pMgmt->dnodeEps, pDnodeEps, size);
} }
pDnode->peerEpSet.inUse = 0; pMgmt->mnodeEpSet.inUse = 0;
pDnode->shellEpSet.inUse = 0;
int32_t mIndex = 0; int32_t mIndex = 0;
for (int32_t i = 0; i < pDnode->dnodeEps->num; i++) { for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) {
SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
if (!pDnodeEp->isMnode) continue; if (!pDnodeEp->isMnode) continue;
if (mIndex >= TSDB_MAX_REPLICA) continue; if (mIndex >= TSDB_MAX_REPLICA) continue;
strcpy(pDnode->shellEpSet.fqdn[mIndex], pDnodeEp->fqdn); strcpy(pMgmt->mnodeEpSet.fqdn[mIndex], pDnodeEp->fqdn);
strcpy(pDnode->peerEpSet.fqdn[mIndex], pDnodeEp->fqdn); pMgmt->mnodeEpSet.port[mIndex] = pDnodeEp->port;
pDnode->shellEpSet.port[mIndex] = pDnodeEp->port;
pDnode->shellEpSet.port[mIndex] = pDnodeEp->port + TSDB_PORT_DNODEDNODE;
mIndex++; mIndex++;
} }
for (int32_t i = 0; i < pDnode->dnodeEps->num; ++i) { for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
taosHashPut(pDnode->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
} }
dndPrintDnodes(pDnd); dndPrintDnodes(pDnode);
} }
static bool dndIsEpChanged(SDnode *pDnd, int32_t dnodeId) { static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) {
bool changed = false; bool changed = false;
dndLockDnode(pDnd); dndRLockDnode(pDnode);
SDnodeEp *pDnodeEp = taosHashGet(pDnd->d.dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *pDnodeEp = taosHashGet(pDnode->dmgmt.dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) { if (pDnodeEp != NULL) {
char epstr[TSDB_EP_LEN + 1]; char epstr[TSDB_EP_LEN + 1];
snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port); snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port);
changed = strcmp(pDnd->opt.localEp, epstr) != 0; changed = strcmp(pEp, epstr) != 0;
} }
dndUnLockDnode(pDnd); dndRUnLockDnode(pDnode);
return changed; return changed;
} }
static int32_t dndReadDnodes(SDnode *pDnd) { static int32_t dndReadDnodes(SDnode *pDnode) {
SDnodeMgmt *pDnode = &pDnd->d; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 30000; int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
cJSON *root = NULL; cJSON *root = NULL;
FILE *fp = NULL; FILE *fp = NULL;
fp = fopen(pDnode->file, "r"); fp = fopen(pMgmt->file, "r");
if (!fp) { if (fp == NULL) {
dDebug("file %s not exist", pDnode->file); dDebug("file %s not exist", pMgmt->file);
code = 0;
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
len = (int32_t)fread(content, 1, maxLen, fp); len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s since content is null", pDnode->file); dError("failed to read %s since content is null", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
content[len] = 0; content[len] = 0;
root = cJSON_Parse(content); root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
dError("failed to read %s since invalid json format", pDnode->file); dError("failed to read %s since invalid json format", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) { if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s since dnodeId not found", pDnode->file); dError("failed to read %s since dnodeId not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnode->dnodeId = atoi(dnodeId->valuestring); pMgmt->dnodeId = atoi(dnodeId->valuestring);
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) { if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", pDnode->file); dError("failed to read %s since clusterId not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnode->clusterId = atoll(clusterId->valuestring); pMgmt->clusterId = atoll(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) { if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", pDnode->file); dError("failed to read %s since dropped not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnode->dropped = atoi(dropped->valuestring); pMgmt->dropped = atoi(dropped->valuestring);
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
dError("failed to read %s since dnodeInfos not found", pDnode->file); dError("failed to read %s since dnodeInfos not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
if (dnodeInfosSize <= 0) { if (dnodeInfosSize <= 0) {
dError("failed to read %s since dnodeInfos size:%d invalid", pDnode->file, dnodeInfosSize); dError("failed to read %s since dnodeInfos size:%d invalid", pMgmt->file, dnodeInfosSize);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnode->dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); pMgmt->dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (pDnode->dnodeEps == NULL) { if (pMgmt->dnodeEps == NULL) {
dError("failed to calloc dnodeEpList since %s", strerror(errno)); dError("failed to calloc dnodeEpList since %s", strerror(errno));
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnode->dnodeEps->num = dnodeInfosSize; pMgmt->dnodeEps->num = dnodeInfosSize;
for (int32_t i = 0; i < dnodeInfosSize; ++i) { for (int32_t i = 0; i < dnodeInfosSize; ++i) {
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
if (dnodeInfo == NULL) break; if (dnodeInfo == NULL) break;
SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) { if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s, dnodeId not found", pDnode->file); dError("failed to read %s, dnodeId not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnodeEp->id = atoi(dnodeId->valuestring); pDnodeEp->id = atoi(dnodeId->valuestring);
cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode");
if (!isMnode || isMnode->type != cJSON_String) { if (!isMnode || isMnode->type != cJSON_String) {
dError("failed to read %s, isMnode not found", pDnode->file); dError("failed to read %s, isMnode not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnodeEp->isMnode = atoi(isMnode->valuestring); pDnodeEp->isMnode = atoi(isMnode->valuestring);
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", pDnode->file); dError("failed to read %s, dnodeFqdn not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
if (!dnodePort || dnodePort->type != cJSON_String) { if (!dnodePort || dnodePort->type != cJSON_String) {
dError("failed to read %s, dnodePort not found", pDnode->file); dError("failed to read %s, dnodePort not found", pMgmt->file);
goto PRASE_DNODE_OVER; goto PRASE_DNODE_OVER;
} }
pDnodeEp->port = atoi(dnodePort->valuestring); pDnodeEp->port = atoi(dnodePort->valuestring);
} }
dInfo("succcessed to read file %s", pDnode->file); code = 0;
dndPrintDnodes(pDnd); dInfo("succcessed to read file %s", pMgmt->file);
dndPrintDnodes(pDnode);
PRASE_DNODE_OVER: PRASE_DNODE_OVER:
if (content != NULL) free(content); if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp); if (fp != NULL) fclose(fp);
if (dndIsEpChanged(pDnd, pDnode->dnodeId)) { if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->opt.localEp)) {
dError("localEp %s different with %s and need reconfigured", pDnd->opt.localEp, pDnode->file); dError("localEp %s different with %s and need reconfigured", pDnode->opt.localEp, pMgmt->file);
return -1; return -1;
} }
if (pDnode->dnodeEps == NULL) { if (pMgmt->dnodeEps == NULL) {
pDnode->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
pDnode->dnodeEps->num = 1; pMgmt->dnodeEps->num = 1;
pDnode->dnodeEps->eps[0].port = pDnd->opt.serverPort; pMgmt->dnodeEps->eps[0].port = pDnode->opt.serverPort;
tstrncpy(pDnode->dnodeEps->eps[0].fqdn, pDnd->opt.localFqdn, TSDB_FQDN_LEN); tstrncpy(pMgmt->dnodeEps->eps[0].fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
} }
dndResetDnodes(pDnd, pDnode->dnodeEps); dndResetDnodes(pDnode, pMgmt->dnodeEps);
terrno = 0; terrno = 0;
return 0; return 0;
} }
static int32_t dndWriteDnodes(SDnode *pDnd) { static int32_t dndWriteDnodes(SDnode *pDnode) {
SDnodeMgmt *pDnode = &pDnd->d; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
FILE *fp = fopen(pDnode->file, "w"); FILE *fp = fopen(pMgmt->file, "w");
if (!fp) { if (fp == NULL) {
dError("failed to write %s since %s", pDnode->file, strerror(errno)); dError("failed to write %s since %s", pMgmt->file, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
@ -318,17 +305,17 @@ static int32_t dndWriteDnodes(SDnode *pDnd) {
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnode->dnodeId); len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pMgmt->dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pDnode->clusterId); len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pDnode->dropped); len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
for (int32_t i = 0; i < pDnode->dnodeEps->num; ++i) { for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
SDnodeEp *pDnodeEp = &pDnode->dnodeEps->eps[i]; SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnodeEp->id); len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnodeEp->id);
len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", pDnodeEp->isMnode); len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", pDnodeEp->isMnode);
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", pDnodeEp->fqdn); len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", pDnodeEp->fqdn);
len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", pDnodeEp->port); len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", pDnodeEp->port);
if (i < pDnode->dnodeEps->num - 1) { if (i < pMgmt->dnodeEps->num - 1) {
len += snprintf(content + len, maxLen - len, " },{\n"); len += snprintf(content + len, maxLen - len, " },{\n");
} else { } else {
len += snprintf(content + len, maxLen - len, " }]\n"); len += snprintf(content + len, maxLen - len, " }]\n");
@ -342,105 +329,105 @@ static int32_t dndWriteDnodes(SDnode *pDnd) {
free(content); free(content);
terrno = 0; terrno = 0;
dInfo("successed to write %s", pDnode->file); dInfo("successed to write %s", pMgmt->file);
return 0; return 0;
} }
static void dndSendStatusMsg(SDnode *pDnd) { static void dndSendStatusMsg(SDnode *pDnode) {
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen); SStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) { if (pStatus == NULL) {
dError("failed to malloc status message"); dError("failed to malloc status message");
return; return;
} }
dndLockDnode(pDnd); dndRLockDnode(pDnode);
pStatus->sversion = htonl(pDnd->opt.sver); pStatus->sversion = htonl(pDnode->opt.sver);
pStatus->dnodeId = htonl(pDnd->d.dnodeId); pStatus->dnodeId = htonl(pDnode->dmgmt.dnodeId);
pStatus->clusterId = htobe64(pDnd->d.clusterId); pStatus->clusterId = htobe64(pDnode->dmgmt.clusterId);
pStatus->rebootTime = htonl(pDnd->d.rebootTime); pStatus->rebootTime = htonl(pDnode->dmgmt.rebootTime);
pStatus->numOfCores = htonl(pDnd->opt.numOfCores); pStatus->numOfCores = htonl(pDnode->opt.numOfCores);
tstrncpy(pStatus->dnodeEp, pDnd->opt.localEp, TSDB_EP_LEN); tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN);
pStatus->clusterCfg.statusInterval = htonl(pDnd->opt.statusInterval); pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval);
tstrncpy(pStatus->clusterCfg.timezone, pDnd->opt.timezone, TSDB_TIMEZONE_LEN); tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN);
tstrncpy(pStatus->clusterCfg.locale, pDnd->opt.locale, TSDB_LOCALE_LEN); tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, pDnd->opt.charset, TSDB_LOCALE_LEN); tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN);
pStatus->clusterCfg.checkTime = 0; pStatus->clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
dndUnLockDnode(pDnd); dndRUnLockDnode(pDnode);
dndGetVnodeLoads(pDnd, &pStatus->vnodeLoads); dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads);
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS};
dndSendMsgToMnode(pDnd, &rpcMsg); dndSendMsgToMnode(pDnode, &rpcMsg);
} }
static void dndUpdateDnodeCfg(SDnode *pDnd, SDnodeCfg *pCfg) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pDnode = &pDnd->d; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pDnode->dnodeId != 0 && pDnode->dropped != pCfg->dropped) return; if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) {
dndLockDnode(pDnd);
pDnode->dnodeId = pCfg->dnodeId;
pDnode->clusterId = pCfg->clusterId;
pDnode->dropped = pCfg->dropped;
dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped);
dndWriteDnodes(pDnd); dndWLockDnode(pDnode);
dndUnLockDnode(pDnd); pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId;
pMgmt->dropped = pCfg->dropped;
(void)dndWriteDnodes(pDnode);
dndWUnLockDnode(pDnode);
}
} }
static void dndUpdateDnodeEps(SDnode *pDnd, SDnodeEps *pDnodeEps) { static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) {
if (pDnodeEps == NULL || pDnodeEps->num <= 0) return; if (pDnodeEps == NULL || pDnodeEps->num <= 0) return;
dndLockDnode(pDnd); dndWLockDnode(pDnode);
if (pDnodeEps->num != pDnd->d.dnodeEps->num) { if (pDnodeEps->num != pDnode->dmgmt.dnodeEps->num) {
dndResetDnodes(pDnd, pDnodeEps); dndResetDnodes(pDnode, pDnodeEps);
dndWriteDnodes(pDnd); dndWriteDnodes(pDnode);
} else { } else {
int32_t size = pDnodeEps->num * sizeof(SDnodeEp) + sizeof(SDnodeEps); int32_t size = pDnodeEps->num * sizeof(SDnodeEp) + sizeof(SDnodeEps);
if (memcmp(pDnd->d.dnodeEps, pDnodeEps, size) != 0) { if (memcmp(pDnode->dmgmt.dnodeEps, pDnodeEps, size) != 0) {
dndResetDnodes(pDnd, pDnodeEps); dndResetDnodes(pDnode, pDnodeEps);
dndWriteDnodes(pDnd); dndWriteDnodes(pDnode);
} }
} }
dndUnLockDnode(pDnd); dndWUnLockDnode(pDnode);
} }
static void dndProcessStatusRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0) { if (pEpSet && pEpSet->numOfEps > 0) {
dndUpdateMnodeEpSet(pDnd, pEpSet); dndUpdateMnodeEpSet(pDnode, pEpSet);
} }
if (pMsg->code != TSDB_CODE_SUCCESS) return; if (pMsg->code != TSDB_CODE_SUCCESS) return;
SStatusRsp *pStatusRsp = pMsg->pCont; SStatusRsp *pRsp = pMsg->pCont;
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; SDnodeCfg *pCfg = &pRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId); pCfg->clusterId = htobe64(pCfg->clusterId);
dndUpdateDnodeCfg(pDnd, pCfg); dndUpdateDnodeCfg(pDnode, pCfg);
if (pCfg->dropped) return; if (pCfg->dropped) return;
SDnodeEps *pDnodeEps = &pStatusRsp->dnodeEps; SDnodeEps *pDnodeEps = &pRsp->dnodeEps;
pDnodeEps->num = htonl(pDnodeEps->num); pDnodeEps->num = htonl(pDnodeEps->num);
for (int32_t i = 0; i < pDnodeEps->num; ++i) { for (int32_t i = 0; i < pDnodeEps->num; ++i) {
pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port); pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port);
} }
dndUpdateDnodeEps(pDnd, pDnodeEps); dndUpdateDnodeEps(pDnode, pDnodeEps);
} }
static void dndProcessAuthRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); }
static void dndProcessGrantRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); }
static void dndProcessConfigDnodeReq(SDnode *pDnd, SRpcMsg *pMsg) { static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
SCfgDnodeMsg *pCfg = pMsg->pCont; SCfgDnodeMsg *pCfg = pMsg->pCont;
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
@ -449,11 +436,11 @@ static void dndProcessConfigDnodeReq(SDnode *pDnd, SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
static void dndProcessStartupReq(SDnode *pDnd, SRpcMsg *pMsg) { static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) {
dInfo("startup msg is received"); dInfo("startup msg is received");
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
dndGetStartup(pDnd, pStartup); dndGetStartup(pDnode, pStartup);
dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
@ -463,52 +450,52 @@ static void dndProcessStartupReq(SDnode *pDnd, SRpcMsg *pMsg) {
} }
static void *dnodeThreadRoutine(void *param) { static void *dnodeThreadRoutine(void *param) {
SDnode *pDnd = param; SDnode *pDnode = param;
int32_t ms = pDnd->opt.statusInterval * 1000; int32_t ms = pDnode->opt.statusInterval * 1000;
while (true) { while (true) {
taosMsleep(ms); taosMsleep(ms);
if (dndGetStat(pDnd) != DND_STAT_RUNNING) { if (dndGetStat(pDnode) != DND_STAT_RUNNING) {
continue; continue;
} }
pthread_testcancel(); pthread_testcancel();
dndSendStatusMsg(pDnd); dndSendStatusMsg(pDnode);
} }
} }
int32_t dndInitDnode(SDnode *pDnd) { int32_t dndInitDnode(SDnode *pDnode) {
SDnodeMgmt *pDnode = &pDnd->d; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pDnode->dnodeId = 0; pMgmt->dnodeId = 0;
pDnode->rebootTime = taosGetTimestampSec(); pMgmt->rebootTime = taosGetTimestampSec();
pDnode->dropped = 0; pMgmt->dropped = 0;
pDnode->clusterId = 0; pMgmt->clusterId = 0;
char path[PATH_MAX]; char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/dnode.json", pDnd->dir.dnode); snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode);
pDnode->file = strdup(path); pMgmt->file = strdup(path);
if (pDnode->file == NULL) { if (pMgmt->file == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pDnode->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (pDnode->dnodeHash == NULL) { if (pMgmt->dnodeHash == NULL) {
dError("failed to init dnode hash"); dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
if (dndReadDnodes(pDnd) != 0) { if (dndReadDnodes(pDnode) != 0) {
dError("failed to read file:%s since %s", pDnode->file, terrstr()); dError("failed to read file:%s since %s", pMgmt->file, terrstr());
return -1; return -1;
} }
pthread_mutex_init(&pDnode->mutex, NULL); taosInitRWLatch(&pMgmt->latch);
pDnode->threadId = taosCreateThread(dnodeThreadRoutine, pDnd); pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode);
if (pDnode->threadId == NULL) { if (pMgmt->threadId == NULL) {
dError("failed to init dnode thread"); dError("failed to init dnode thread");
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
@ -518,44 +505,42 @@ int32_t dndInitDnode(SDnode *pDnd) {
return 0; return 0;
} }
void dndCleanupDnode(SDnode *pDnd) { void dndCleanupDnode(SDnode *pDnode) {
SDnodeMgmt *pDnode = &pDnd->d; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pDnode->threadId != NULL) { if (pMgmt->threadId != NULL) {
taosDestoryThread(pDnode->threadId); taosDestoryThread(pMgmt->threadId);
pDnode->threadId = NULL; pMgmt->threadId = NULL;
} }
dndLockDnode(pDnd); dndWLockDnode(pDnode);
if (pDnode->dnodeEps != NULL) { if (pMgmt->dnodeEps != NULL) {
free(pDnode->dnodeEps); free(pMgmt->dnodeEps);
pDnode->dnodeEps = NULL; pMgmt->dnodeEps = NULL;
} }
if (pDnode->dnodeHash != NULL) { if (pMgmt->dnodeHash != NULL) {
taosHashCleanup(pDnode->dnodeHash); taosHashCleanup(pMgmt->dnodeHash);
pDnode->dnodeHash = NULL; pMgmt->dnodeHash = NULL;
} }
if (pDnode->file != NULL) { if (pMgmt->file != NULL) {
free(pDnode->file); free(pMgmt->file);
pDnode->file = NULL; pMgmt->file = NULL;
} }
dndUnLockDnode(pDnd); dndWUnLockDnode(pDnode);
pthread_mutex_destroy(&pDnode->mutex);
dInfo("dnd-dnode is cleaned up"); dInfo("dnd-dnode is cleaned up");
} }
void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TSDB_MSG_TYPE_NETWORK_TEST: case TSDB_MSG_TYPE_NETWORK_TEST:
dndProcessStartupReq(pDnd, pMsg); dndProcessStartupReq(pDnode, pMsg);
break; break;
case TSDB_MSG_TYPE_CONFIG_DNODE_IN: case TSDB_MSG_TYPE_CONFIG_DNODE_IN:
dndProcessConfigDnodeReq(pDnd, pMsg); dndProcessConfigDnodeReq(pDnode, pMsg);
break; break;
default: default:
dError("RPC %p, dnode req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, dnode req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
@ -565,16 +550,16 @@ void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) {
} }
} }
void dndProcessDnodeRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TSDB_MSG_TYPE_STATUS_RSP: case TSDB_MSG_TYPE_STATUS_RSP:
dndProcessStatusRsp(pDnd, pMsg, pEpSet); dndProcessStatusRsp(pDnode, pMsg, pEpSet);
break; break;
case TSDB_MSG_TYPE_AUTH_RSP: case TSDB_MSG_TYPE_AUTH_RSP:
dndProcessAuthRsp(pDnd, pMsg, pEpSet); dndProcessAuthRsp(pDnode, pMsg, pEpSet);
break; break;
case TSDB_MSG_TYPE_GRANT_RSP: case TSDB_MSG_TYPE_GRANT_RSP:
dndProcessGrantRsp(pDnd, pMsg, pEpSet); dndProcessGrantRsp(pDnode, pMsg, pEpSet);
break; break;
default: default:
dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);

View File

@ -53,7 +53,7 @@ void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg) {
void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell) { void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell) {
assert(pMnode); assert(pMnode);
(*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg, forShell); (*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg);
} }
static int32_t mnodeInitTimer() { static int32_t mnodeInitTimer() {