refact dnode-dnode file

This commit is contained in:
Shengliang Guan 2021-11-02 14:27:20 +08:00
parent 1284e413b0
commit 4690773668
15 changed files with 462 additions and 479 deletions

View File

@ -65,30 +65,6 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" )
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
@ -121,6 +97,29 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" )
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY20, "dummy20" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY21, "dummy21" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY22, "dummy22" )
@ -133,9 +132,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY28, "dummy28" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY29, "dummy29" )
// message from dnode to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_STATUS, "status" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_GRANT, "grant" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STATUS, "status" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_GRANT, "grant" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH, "auth" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY30, "dummy30" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY31, "dummy31" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY32, "dummy32" )
@ -585,20 +584,6 @@ typedef struct SRetrieveTableRsp {
char data[];
} SRetrieveTableRsp;
typedef struct {
int32_t vgId;
int32_t dbCfgVersion;
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
uint64_t vnodeVersion;
int32_t vgCfgVersion;
uint8_t status;
uint8_t role;
uint8_t replica;
uint8_t compact;
} SVnodeLoad;
typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t cacheBlockSize; //MB
@ -665,28 +650,47 @@ typedef struct {
uint8_t ignoreNotExists;
} SDropDbMsg, SUseDbMsg, SSyncDbMsg;
// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed
// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE
typedef struct {
int64_t pointsWritten; // In unit of points
int64_t totalStorage; // In unit of bytes
int64_t compStorage; // In unit of bytes
int64_t queryTime; // In unit of second ??
char reserved[64];
} SVnodeStatisticInfo;
int32_t statusInterval;
int8_t reserved[4];
int64_t checkTime; // 1970-01-01 00:00:00.000
char timezone[TSDB_TIMEZONE_LEN]; // tsTimezone
char locale[TSDB_LOCALE_LEN]; // tsLocale
char charset[TSDB_LOCALE_LEN]; // tsCharset
} SClusterCfg;
typedef struct SVgroupAccess {
int32_t vgId;
int8_t accessState;
} SVgroupAccess;
typedef struct {
int32_t vgId;
int8_t status;
int8_t role;
int8_t reserved[2];
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
int64_t tablesNum;
} SVnodeLoad;
typedef struct {
int32_t vnodeNum;
SVnodeLoad vnodeLoads[];
} SVnodeLoads;
typedef struct SStatusMsg {
uint32_t sversion;
int32_t dnodeId;
int64_t clusterId;
uint32_t rebootTime; // time stamp for last reboot
int32_t numOfCores;
char dnodeEp[TSDB_EP_LEN];
SClusterCfg clusterCfg;
SVnodeLoads vnodeLoads;
} SStatusMsg;
typedef struct {
int32_t dnodeId;
int8_t dropped;
char reserved[19];
char reserved[3];
int64_t clusterId;
int32_t numOfDnodes;
int32_t numOfVnodes;
} SDnodeCfg;
typedef struct {
@ -703,31 +707,8 @@ typedef struct {
} SDnodeEps;
typedef struct {
int32_t statusInterval; // tsStatusInterval
int8_t reserved[36];
int64_t checkTime; // 1970-01-01 00:00:00.000
char timezone[64]; // tsTimezone
char locale[TSDB_LOCALE_LEN]; // tsLocale
char charset[TSDB_LOCALE_LEN]; // tsCharset
} SClusterCfg;
typedef struct SStatusMsg {
uint32_t version;
int32_t dnodeId;
uint32_t lastReboot; // time stamp for last reboot
int32_t openVnodes;
int32_t numOfCores;
float diskAvailable;
int8_t reserved[36];
char dnodeEp[TSDB_EP_LEN];
int64_t clusterId;
SClusterCfg clusterCfg;
SVnodeLoad load[];
} SStatusMsg;
typedef struct {
SDnodeCfg dnodeCfg;
SVgroupAccess vgAccess[];
SDnodeCfg dnodeCfg;
SDnodeEps dnodeEps;
} SStatusRsp;
typedef struct {

View File

@ -198,7 +198,7 @@ extern SDiskCfg tsDiskCfg[];
void taosInitGlobalCfg();
int32_t taosCheckGlobalCfg();
bool taosCfgDynamicOptions(char *msg);
int32_t taosCfgDynamicOptions(char *msg);
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId);
void taosAddDataDir(int index, char *v1, int level, int primary);

View File

@ -58,10 +58,6 @@ typedef struct {
int8_t syncRole;
} SVnodeStatus;
typedef struct {
int32_t accessState;
} SVnodeAccess;
typedef struct SVnodeMsg {
int32_t msgType;
int32_t code;

View File

@ -277,13 +277,13 @@ void taosSetAllDebugFlag() {
}
}
bool taosCfgDynamicOptions(char *msg) {
int32_t taosCfgDynamicOptions(char *msg) {
char *option, *value;
int32_t olen, vlen;
int32_t vint = 0;
paGetToken(msg, &option, &olen);
if (olen == 0) return false;;
if (olen == 0) return -1;;
paGetToken(option + olen + 1, &value, &vlen);
if (vlen == 0)
@ -324,18 +324,18 @@ bool taosCfgDynamicOptions(char *msg) {
uError("monitor can't be updated, for monitor not initialized");
}
}
return true;
return 0;
}
if (strncasecmp(cfg->option, "debugFlag", olen) == 0) {
taosSetAllDebugFlag();
}
return true;
return 0;
}
if (strncasecmp(option, "resetlog", 8) == 0) {
taosResetLog();
taosPrintGlobalCfg();
return true;
return 0;
}
if (strncasecmp(option, "resetQueryCache", 15) == 0) {

View File

@ -23,23 +23,14 @@ extern "C" {
int32_t dnodeInitDnode();
void dnodeCleanupDnode();
void dnodeProcessStatusRsp(SRpcMsg *pMsg);
void dnodeProcessStartupReq(SRpcMsg *pMsg);
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg);
void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet);
int32_t dnodeInitConfig();
void dnodeCleanupConfig();
void dnodeUpdateCfg(SDnodeCfg *data);
void dnodeUpdateDnodeEps(SDnodeEps *data);
void dnodeUpdateMnodeEps(SRpcEpSet *pEpSet);
int32_t dnodeGetDnodeId();
int64_t dnodeGetClusterId();
void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetMnodeEpSetForShell(SRpcEpSet *epSet);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus
}

View File

@ -35,6 +35,7 @@ extern int32_t dDebugFlag;
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat;
typedef void (*MsgFp)(SRpcMsg *pMsg, SRpcEpSet *pEpSet);
int32_t dnodeInit();
void dnodeCleanup();

View File

@ -23,6 +23,7 @@ extern "C" {
int32_t dnodeInitMnode();
void dnodeCleanupMnode();
void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet);
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg);

View File

@ -23,6 +23,8 @@ extern "C" {
int32_t dnodeInitVnodes();
void dnodeCleanupVnodes();
void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet);
void dnodeGetVnodes(SVnodeLoads *pVloads);
#ifdef __cplusplus
}

View File

@ -16,70 +16,83 @@
#define _DEFAULT_SOURCE
#include "dnodeDnode.h"
#include "dnodeTransport.h"
#include "tthread.h"
#include "ttime.h"
#include "dnodeVnodes.h"
#include "cJSON.h"
#include "thash.h"
#include "tthread.h"
#include "ttime.h"
static struct {
int32_t dnodeId;
int32_t dropped;
int64_t clusterId;
SDnodeEps *dnodeEps;
SHashObj *dnodeHash;
SRpcEpSet mnodeEpSetForShell;
SRpcEpSet mnodeEpSetForPeer;
char file[PATH_MAX + 20];
uint32_t rebootTime;
int8_t dropped;
int8_t threadStop;
pthread_t *threadId;
pthread_mutex_t mutex;
} tsConfig;
MsgFp msgFp[TSDB_MSG_TYPE_MAX];
} tsDnode = {0};
void dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsConfig.mutex);
*epSet = tsConfig.mnodeEpSetForPeer;
pthread_mutex_unlock(&tsConfig.mutex);
int32_t dnodeGetDnodeId() {
int32_t dnodeId = 0;
pthread_mutex_lock(&tsDnode.mutex);
dnodeId = tsDnode.dnodeId;
pthread_mutex_unlock(&tsDnode.mutex);
return dnodeId;
}
static void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsConfig.mutex);
*epSet = tsConfig.mnodeEpSetForShell;
pthread_mutex_unlock(&tsConfig.mutex);
int64_t dnodeGetClusterId() {
int64_t clusterId = 0;
pthread_mutex_lock(&tsDnode.mutex);
clusterId = tsDnode.clusterId;
pthread_mutex_unlock(&tsDnode.mutex);
return clusterId;
}
void dnodeUpdateMnodeEps(SRpcEpSet *ep) {
if (ep != NULL || ep->numOfEps <= 0) {
dError("mnode is changed, but content is invalid, discard it");
return;
void dnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) {
pthread_mutex_lock(&tsDnode.mutex);
SDnodeEp *pEp = taosHashGet(tsDnode.dnodeHash, &dnodeId, sizeof(int32_t));
if (pEp != NULL) {
if (port) *port = pEp->dnodePort;
if (fqdn) tstrncpy(fqdn, pEp->dnodeFqdn, TSDB_FQDN_LEN);
if (ep) snprintf(ep, TSDB_EP_LEN, "%s:%u", pEp->dnodeFqdn, pEp->dnodePort);
}
pthread_mutex_lock(&tsConfig.mutex);
dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse);
tsConfig.mnodeEpSetForPeer = *ep;
for (int32_t i = 0; i < ep->numOfEps; ++i) {
ep->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
}
tsConfig.mnodeEpSetForShell = *ep;
pthread_mutex_unlock(&tsConfig.mutex);
pthread_mutex_unlock(&tsDnode.mutex);
}
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo);
void dnodeGetMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
pthread_mutex_lock(&tsDnode.mutex);
*pEpSet = tsDnode.mnodeEpSetForPeer;
pthread_mutex_unlock(&tsDnode.mutex);
}
void dnodeGetMnodeEpSetForShell(SRpcEpSet *pEpSet) {
pthread_mutex_lock(&tsDnode.mutex);
*pEpSet = tsDnode.mnodeEpSetForShell;
pthread_mutex_unlock(&tsDnode.mutex);
}
void dnodeSendRedirectMsg(SRpcMsg *pMsg, bool forShell) {
int32_t msgType = pMsg->msgType;
SRpcEpSet epSet = {0};
if (forShell) {
dnodeGetEpSetForShell(&epSet);
dnodeGetMnodeEpSetForShell(&epSet);
} else {
dnodeGetEpSetForPeer(&epSet);
dnodeGetMnodeEpSetForPeer(&epSet);
}
dDebug("msg:%s will be redirected, num:%d use:%d", taosMsg[rpcMsg->msgType], epSet.numOfEps, epSet.inUse);
dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, taosMsg[msgType], epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) {
if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) ||
(epSet.port[i] == tsServerPort && forShell)) {
@ -91,71 +104,88 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
epSet.port[i] = htons(epSet.port[i]);
}
rpcSendRedirectRsp(rpcMsg->handle, &epSet);
rpcSendRedirectRsp(pMsg->handle, &epSet);
}
static void dnodeUpdateMnodeEpSet(SRpcEpSet *pEpSet) {
if (pEpSet == NULL || pEpSet->numOfEps <= 0) {
dError("mnode is changed, but content is invalid, discard it");
return;
} else {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
}
pthread_mutex_lock(&tsDnode.mutex);
tsDnode.mnodeEpSetForPeer = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
}
tsDnode.mnodeEpSetForShell = *pEpSet;
pthread_mutex_unlock(&tsDnode.mutex);
}
static void dnodePrintEps() {
dDebug("print dnode list, num:%d", tsConfig.dnodeEps->dnodeNum);
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
dDebug("print dnode endpoint list, num:%d", tsDnode.dnodeEps->dnodeNum);
for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i];
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode);
}
}
static void dnodeResetEps(SDnodeEps *data) {
assert(data != NULL);
static void dnodeResetEps(SDnodeEps *pEps) {
assert(pEps != NULL);
int32_t size = sizeof(SDnodeEps) + pEps->dnodeNum * sizeof(SDnodeEp);
int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp);
if (data->dnodeNum > tsConfig.dnodeEps->dnodeNum) {
if (pEps->dnodeNum > tsDnode.dnodeEps->dnodeNum) {
SDnodeEps *tmp = calloc(1, size);
if (tmp == NULL) return;
tfree(tsConfig.dnodeEps);
tsConfig.dnodeEps = tmp;
tfree(tsDnode.dnodeEps);
tsDnode.dnodeEps = tmp;
}
if (tsConfig.dnodeEps != data) {
memcpy(tsConfig.dnodeEps, data, size);
if (tsDnode.dnodeEps != pEps) {
memcpy(tsDnode.dnodeEps, pEps, size);
}
tsConfig.mnodeEpSetForPeer.inUse = 0;
tsConfig.mnodeEpSetForShell.inUse = 0;
int32_t index = 0;
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
tsDnode.mnodeEpSetForPeer.inUse = 0;
tsDnode.mnodeEpSetForShell.inUse = 0;
int32_t mIndex = 0;
for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i];
if (!ep->isMnode) continue;
if (index >= TSDB_MAX_REPLICA) continue;
strcpy(tsConfig.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn);
strcpy(tsConfig.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn);
tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort;
tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort;
index++;
if (mIndex >= TSDB_MAX_REPLICA) continue;
strcpy(tsDnode.mnodeEpSetForShell.fqdn[mIndex], ep->dnodeFqdn);
strcpy(tsDnode.mnodeEpSetForPeer.fqdn[mIndex], ep->dnodeFqdn);
tsDnode.mnodeEpSetForShell.port[mIndex] = ep->dnodePort;
tsDnode.mnodeEpSetForShell.port[mIndex] = ep->dnodePort + tsDnodeDnodePort;
mIndex++;
}
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
taosHashPut(tsConfig.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i];
taosHashPut(tsDnode.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
}
dnodePrintEps();
}
static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) {
static bool dnodeIsEpChanged(int32_t dnodeId, char *epStr) {
bool changed = false;
pthread_mutex_lock(&tsDnode.mutex);
pthread_mutex_lock(&tsConfig.mutex);
SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
SDnodeEp *pEp = taosHashGet(tsDnode.dnodeHash, &dnodeId, sizeof(int32_t));
if (pEp != NULL) {
char epSaved[TSDB_EP_LEN + 1];
snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
changed = strcmp(epstr, epSaved) != 0;
tstrncpy(epstr, epSaved, TSDB_EP_LEN);
snprintf(epSaved, TSDB_EP_LEN, "%s:%u", pEp->dnodeFqdn, pEp->dnodePort);
changed = strcmp(epStr, epSaved) != 0;
}
pthread_mutex_unlock(&tsConfig.mutex);
pthread_mutex_unlock(&tsDnode.mutex);
return changed;
}
@ -166,101 +196,101 @@ static int32_t dnodeReadEps() {
cJSON *root = NULL;
FILE *fp = NULL;
fp = fopen(tsConfig.file, "r");
fp = fopen(tsDnode.file, "r");
if (!fp) {
dDebug("file %s not exist", tsConfig.file);
dDebug("file %s not exist", tsDnode.file);
goto PRASE_EPS_OVER;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s since content is null", tsConfig.file);
dError("failed to read %s since content is null", tsDnode.file);
goto PRASE_EPS_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", tsConfig.file);
dError("failed to read %s since invalid json format", tsDnode.file);
goto PRASE_EPS_OVER;
}
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s since dnodeId not found", tsConfig.file);
dError("failed to read %s since dnodeId not found", tsDnode.file);
goto PRASE_EPS_OVER;
}
tsConfig.dnodeId = atoi(dnodeId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", tsConfig.file);
goto PRASE_EPS_OVER;
}
tsConfig.dropped = atoi(dropped->valuestring);
tsDnode.dnodeId = atoi(dnodeId->valuestring);
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", tsConfig.file);
dError("failed to read %s since clusterId not found", tsDnode.file);
goto PRASE_EPS_OVER;
}
tsConfig.clusterId = atoll(clusterId->valuestring);
tsDnode.clusterId = atoll(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", tsDnode.file);
goto PRASE_EPS_OVER;
}
tsDnode.dropped = atoi(dropped->valuestring);
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
dError("failed to read %s since dnodeInfos not found", tsConfig.file);
dError("failed to read %s since dnodeInfos not found", tsDnode.file);
goto PRASE_EPS_OVER;
}
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
if (dnodeInfosSize <= 0) {
dError("failed to read %s since dnodeInfos size:%d invalid", tsConfig.file, dnodeInfosSize);
dError("failed to read %s since dnodeInfos size:%d invalid", tsDnode.file, dnodeInfosSize);
goto PRASE_EPS_OVER;
}
tsConfig.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (tsConfig.dnodeEps == NULL) {
tsDnode.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (tsDnode.dnodeEps == NULL) {
dError("failed to calloc dnodeEpList since %s", strerror(errno));
goto PRASE_EPS_OVER;
}
tsConfig.dnodeEps->dnodeNum = dnodeInfosSize;
tsDnode.dnodeEps->dnodeNum = dnodeInfosSize;
for (int32_t i = 0; i < dnodeInfosSize; ++i) {
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
if (dnodeInfo == NULL) break;
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
SDnodeEp *pEp = &tsDnode.dnodeEps->dnodeEps[i];
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s, dnodeId not found", tsConfig.file);
dError("failed to read %s, dnodeId not found", tsDnode.file);
goto PRASE_EPS_OVER;
}
ep->dnodeId = atoi(dnodeId->valuestring);
pEp->dnodeId = atoi(dnodeId->valuestring);
cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode");
if (!isMnode || isMnode->type != cJSON_String) {
dError("failed to read %s, isMnode not found", tsConfig.file);
dError("failed to read %s, isMnode not found", tsDnode.file);
goto PRASE_EPS_OVER;
}
ep->isMnode = atoi(isMnode->valuestring);
pEp->isMnode = atoi(isMnode->valuestring);
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", tsConfig.file);
dError("failed to read %s, dnodeFqdn not found", tsDnode.file);
goto PRASE_EPS_OVER;
}
tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
tstrncpy(pEp->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
if (!dnodePort || dnodePort->type != cJSON_String) {
dError("failed to read %s, dnodePort not found", tsConfig.file);
dError("failed to read %s, dnodePort not found", tsDnode.file);
goto PRASE_EPS_OVER;
}
ep->dnodePort = atoi(dnodePort->valuestring);
pEp->dnodePort = atoi(dnodePort->valuestring);
}
dInfo("succcessed to read file %s", tsConfig.file);
dInfo("succcessed to read file %s", tsDnode.file);
dnodePrintEps();
PRASE_EPS_OVER:
@ -268,21 +298,21 @@ PRASE_EPS_OVER:
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
if (dnodeIsDnodeEpChanged(tsConfig.dnodeId, tsLocalEp)) {
dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsConfig.dnodeId, tsLocalEp);
if (dnodeIsEpChanged(tsDnode.dnodeId, tsLocalEp)) {
dError("localEp %s different with %s and need reconfigured", tsLocalEp, tsDnode.file);
return -1;
}
dnodeResetEps(tsConfig.dnodeEps);
dnodeResetEps(tsDnode.dnodeEps);
terrno = 0;
return 0;
}
static int32_t dnodeWriteEps() {
FILE *fp = fopen(tsConfig.file, "w");
FILE *fp = fopen(tsDnode.file, "w");
if (!fp) {
dError("failed to write %s since %s", tsConfig.file, strerror(errno));
dError("failed to write %s since %s", tsDnode.file, strerror(errno));
return -1;
}
@ -291,17 +321,17 @@ static int32_t dnodeWriteEps() {
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsConfig.dnodeId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsConfig.dropped);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsConfig.clusterId);
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsDnode.dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsDnode.clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsDnode.dropped);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i];
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId);
len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode);
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort);
if (i < tsConfig.dnodeEps->dnodeNum - 1) {
if (i < tsDnode.dnodeEps->dnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
@ -315,150 +345,76 @@ static int32_t dnodeWriteEps() {
free(content);
terrno = 0;
dInfo("successed to write %s", tsConfig.file);
dInfo("successed to write %s", tsDnode.file);
return 0;
}
int32_t dnodeInitConfig() {
tsConfig.dnodeId = 0;
tsConfig.dropped = 0;
tsConfig.clusterId = 0;
tsConfig.dnodeEps = NULL;
snprintf(tsConfig.file, sizeof(tsConfig.file), "%s/dnodeEps.json", tsDnodeDir);
pthread_mutex_init(&tsConfig.mutex, NULL);
tsConfig.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsConfig.dnodeHash == NULL) return -1;
int32_t ret = dnodeReadEps();
if (ret == 0) {
dInfo("dnode eps is initialized");
}
return ret;
}
void dnodeCleanupConfig() {
pthread_mutex_lock(&tsConfig.mutex);
if (tsConfig.dnodeEps != NULL) {
free(tsConfig.dnodeEps);
tsConfig.dnodeEps = NULL;
}
if (tsConfig.dnodeHash) {
taosHashCleanup(tsConfig.dnodeHash);
tsConfig.dnodeHash = NULL;
}
pthread_mutex_unlock(&tsConfig.mutex);
pthread_mutex_destroy(&tsConfig.mutex);
}
void dnodeUpdateDnodeEps(SDnodeEps *data) {
if (data == NULL || data->dnodeNum <= 0) return;
pthread_mutex_lock(&tsConfig.mutex);
if (data->dnodeNum != tsConfig.dnodeEps->dnodeNum) {
dnodeResetEps(data);
dnodeWriteEps();
} else {
int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps);
if (memcmp(tsConfig.dnodeEps, data, size) != 0) {
dnodeResetEps(data);
dnodeWriteEps();
}
}
pthread_mutex_unlock(&tsConfig.mutex);
}
void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
pthread_mutex_lock(&tsConfig.mutex);
SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
if (port) *port = ep->dnodePort;
if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN);
if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
}
pthread_mutex_unlock(&tsConfig.mutex);
}
void dnodeUpdateCfg(SDnodeCfg *data) {
if (tsConfig.dnodeId != 0 && !data->dropped) return;
pthread_mutex_lock(&tsConfig.mutex);
tsConfig.dnodeId = data->dnodeId;
tsConfig.clusterId = data->clusterId;
tsConfig.dropped = data->dropped;
dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId);
dnodeWriteEps();
pthread_mutex_unlock(&tsConfig.mutex);
}
int32_t dnodeGetDnodeId() {
int32_t dnodeId = 0;
pthread_mutex_lock(&tsConfig.mutex);
dnodeId = tsConfig.dnodeId;
pthread_mutex_unlock(&tsConfig.mutex);
return dnodeId;
}
int64_t dnodeGetClusterId() {
int64_t clusterId = 0;
pthread_mutex_lock(&tsConfig.mutex);
clusterId = tsConfig.clusterId;
pthread_mutex_unlock(&tsConfig.mutex);
return clusterId;
}
static struct {
pthread_t *threadId;
bool threadStop;
uint32_t rebootTime;
} tsDnode;
static void dnodeSendStatusMsg() {
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
dError("failed to malloc status message");
return;
}
pStatus->version = htonl(tsVersion);
pStatus->sversion = htonl(tsVersion);
pStatus->dnodeId = htonl(dnodeGetDnodeId());
tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
pStatus->clusterId = htobe64(dnodeGetClusterId());
pStatus->lastReboot = htonl(tsDnode.rebootTime);
pStatus->rebootTime = htonl(tsDnode.rebootTime);
pStatus->numOfCores = htonl(tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
// fill cluster cfg parameters
pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
pStatus->clusterCfg.checkTime = 0;
tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, TSDB_TIMEZONE_LEN);
tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
// vnodeGetStatus(NULL, pStatus);
// contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
// pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {.ahandle = NULL, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS};
dnodeGetVnodes(&pStatus->vnodeLoads);
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.vnodeNum * sizeof(SVnodeLoad);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS};
dnodeSendMsgToMnode(&rpcMsg);
}
void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
dTrace("status rsp is received, code:%s", tstrerror(pMsg->code));
static void dnodeUpdateCfg(SDnodeCfg *pCfg) {
if (tsDnode.dnodeId == 0) return;
if (tsDnode.dropped) return;
pthread_mutex_lock(&tsDnode.mutex);
tsDnode.dnodeId = pCfg->dnodeId;
tsDnode.clusterId = pCfg->clusterId;
tsDnode.dropped = pCfg->dropped;
dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, pCfg->dnodeId, pCfg->clusterId);
dnodeWriteEps();
pthread_mutex_unlock(&tsDnode.mutex);
}
static void dnodeUpdateDnodeEps(SDnodeEps *pEps) {
if (pEps == NULL || pEps->dnodeNum <= 0) return;
pthread_mutex_lock(&tsDnode.mutex);
if (pEps->dnodeNum != tsDnode.dnodeEps->dnodeNum) {
dnodeResetEps(pEps);
dnodeWriteEps();
} else {
int32_t size = pEps->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps);
if (memcmp(tsDnode.dnodeEps, pEps, size) != 0) {
dnodeResetEps(pEps);
dnodeWriteEps();
}
}
pthread_mutex_unlock(&tsDnode.mutex);
}
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) return;
SStatusRsp *pStatusRsp = pMsg->pCont;
@ -466,25 +422,40 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId);
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
pCfg->numOfDnodes = htonl(pCfg->numOfDnodes);
dnodeUpdateCfg(pCfg);
if (pCfg->dropped) {
dError("status rsp is received, and set dnode to drop status");
return;
if (pCfg->dropped) return;
SDnodeEps *pEps = &pStatusRsp->dnodeEps;
pEps->dnodeNum = htonl(pEps->dnodeNum);
for (int32_t i = 0; i < pEps->dnodeNum; ++i) {
pEps->dnodeEps[i].dnodeId = htonl(pEps->dnodeEps[i].dnodeId);
pEps->dnodeEps[i].dnodePort = htons(pEps->dnodeEps[i].dnodePort);
}
// vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
dnodeUpdateDnodeEps(pEps);
}
SDnodeEps *eps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
eps->dnodeNum = htonl(eps->dnodeNum);
for (int32_t i = 0; i < eps->dnodeNum; ++i) {
eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId);
eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort);
}
static void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
SCfgDnodeMsg *pCfg = pMsg->pCont;
dnodeUpdateDnodeEps(eps);
int32_t code = taosCfgDynamicOptions(pCfg->config);
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
static void dnodeProcessStartupReq(SRpcMsg *pMsg) {
dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
dnodeGetStartup(pStep);
dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
}
static void *dnodeThreadRoutine(void *param) {
@ -496,14 +467,34 @@ static void *dnodeThreadRoutine(void *param) {
}
int32_t dnodeInitDnode() {
tsDnode.threadStop = false;
tsDnode.dnodeId = 0;
tsDnode.clusterId = 0;
tsDnode.dnodeEps = NULL;
snprintf(tsDnode.file, sizeof(tsDnode.file), "%s/dnode.json", tsDnodeDir);
tsDnode.rebootTime = taosGetTimestampSec();
tsDnode.threadId = taosCreateThread(dnodeThreadRoutine, NULL);
if (tsDnode.threadId == NULL) {
return -1;
tsDnode.dropped = 0;
pthread_mutex_init(&tsDnode.mutex, NULL);
tsDnode.threadStop = false;
tsDnode.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsDnode.dnodeHash == NULL) {
dError("failed to init dnode hash");
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
dInfo("dnode msg is initialized");
tsDnode.threadId = taosCreateThread(dnodeThreadRoutine, NULL);
if (tsDnode.threadId == NULL) {
dError("failed to init dnode thread");
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
int32_t code = dnodeReadEps();
if (code != 0) {
dError("failed to read dnode endpoint file since %s", tstrerror(code));
return code;
}
dInfo("dnode-dnode is initialized");
return 0;
}
@ -514,29 +505,45 @@ void dnodeCleanupDnode() {
tsDnode.threadId = NULL;
}
dInfo("dnode msg is cleanuped");
pthread_mutex_lock(&tsDnode.mutex);
if (tsDnode.dnodeEps != NULL) {
free(tsDnode.dnodeEps);
tsDnode.dnodeEps = NULL;
}
if (tsDnode.dnodeHash) {
taosHashCleanup(tsDnode.dnodeHash);
tsDnode.dnodeHash = NULL;
}
pthread_mutex_unlock(&tsDnode.mutex);
pthread_mutex_destroy(&tsDnode.mutex);
dInfo("dnode-dnode is cleaned up");
}
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
SCfgDnodeMsg *pCfg = pMsg->pCont;
void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
int32_t msgType = pMsg->msgType;
int32_t code = taosCfgDynamicOptions(pCfg->config);
if (msgType == TSDB_MSG_TYPE_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeEpSet(pEpSet);
}
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
void dnodeProcessStartupReq(SRpcMsg *pMsg) {
dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
dnodeGetStartup(pStep);
dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
switch (msgType) {
case TSDB_MSG_TYPE_NETWORK_TEST:
dnodeProcessStartupReq(pMsg);
break;
case TSDB_MSG_TYPE_CONFIG_DNODE_IN:
dnodeProcessConfigDnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_STATUS_RSP:
dnodeProcessStatusRsp(pMsg);
break;
default:
dError("RPC %p, %s not processed", pMsg->handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
}

View File

@ -21,7 +21,7 @@
int32_t dnodeInitMnode() {
SMnodePara para;
para.fp.GetDnodeEp = dnodeGetEp;
para.fp.GetDnodeEp = dnodeGetDnodeEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.SendRedirectMsg = dnodeSendRedirectMsg;
@ -59,4 +59,11 @@ void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
mnodeProcessMsg(pMsg);
// tsDnode.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq;
// tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessDropMnodeReq;
}

View File

@ -26,9 +26,6 @@
#include "dnodeVnodes.h"
#include "mnode.h"
#include "vnode.h"
typedef void (*MsgFp)(SRpcMsg *pMsg);
static struct {
void *serverRpc;
void *clientRpc;
@ -38,88 +35,88 @@ static struct {
static void dnodeInitMsgFp() {
// msg from client to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq;
tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg;
// msg from client to mnode
tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg;
// message from mnode to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = dnodeProcessDnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dnodeProcessMnodeMsg;
// message from dnode to mnode
tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
tsTrans.msgFp[TSDB_MSG_TYPE_AUTH] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dnodeProcessDnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_GRANT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dnodeProcessDnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STATUS] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessDnodeMsg;
}
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
@ -127,7 +124,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dnodeProcessStartupReq(pMsg);
dnodeProcessDnodeMsg(pMsg, pEpSet);
return;
}
@ -148,7 +145,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
(*fp)(pMsg, pEpSet);
} else {
dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
@ -196,14 +193,10 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
return;
}
if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeEps(pEpSet);
}
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
dTrace("RPC %p, peer rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code));
(*fp)(pMsg, pEpSet);
} else {
dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
}
@ -270,7 +263,7 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
(*fp)(pMsg, pEpSet);
} else {
dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
@ -283,13 +276,13 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsT
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(&epSet);
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, rpcMsg);
}
static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(&epSet);
dnodeGetMnodeEpSetForPeer(&epSet);
rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp);
}
@ -303,7 +296,7 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pMsg;
rpcMsg.contLen = sizeof(SAuthMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH;
rpcMsg.msgType = TSDB_MSG_TYPE_AUTH;
dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcRsp = {0};

View File

@ -19,4 +19,8 @@
int32_t dnodeInitVnodes() { return vnodeInit(); }
void dnodeCleanupVnodes() { vnodeCleanup(); }
void dnodeCleanupVnodes() { vnodeCleanup(); }
void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); }
void dnodeGetVnodes(SVnodeLoads *pVloads) {}

View File

@ -120,7 +120,7 @@ typedef struct SDnodeObj {
int64_t createdTime;
int64_t updateTime;
int64_t lastAccess;
int64_t lastReboot; // time stamp for last reboot
int64_t rebootTime; // time stamp for last reboot
char fqdn[TSDB_FQDN_LEN];
char ep[TSDB_EP_LEN];
uint16_t port;

View File

@ -171,12 +171,12 @@ static void mnodeInitMsgFp() {
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessTableCfgMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessVnodeCfgMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessAuthMsg;
// // tsMworker.msgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeDispatchToPeerQueue;
// // tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_GRANT] = grantProcessMsgInMgmt;
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessDnodeStatusMsg;
// tsMworker.msgFp[TSDB_MSG_TYPE_AUTH] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_AUTH] = mnodeProcessAuthMsg;
// // tsMworker.msgFp[TSDB_MSG_TYPE_GRANT] = mnodeDispatchToPeerQueue;
// // tsMworker.peerReqFp[TSDB_MSG_TYPE_GRANT] = grantProcessMsgInMgmt;
// tsMworker.msgFp[TSDB_MSG_TYPE_STATUS] = mnodeDispatchToPeerQueue;
// tsMworker.peerReqFp[TSDB_MSG_TYPE_STATUS] = mnodeProcessDnodeStatusMsg;
// // peer rsp
// tsMworker.msgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeDispatchToPeerRspQueue;

View File

@ -406,7 +406,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_SHOW_RETRIEVE
|| type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_STABLE_VGROUP
|| type == TSDB_MSG_TYPE_TABLES_META || type == TSDB_MSG_TYPE_TABLE_META
|| type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_DM_STATUS || type == TSDB_MSG_TYPE_ALTER_TABLE)
|| type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_STATUS || type == TSDB_MSG_TYPE_ALTER_TABLE)
pContext->connType = RPC_CONN_TCPC;
pContext->rid = taosAddRef(tsRpcRefId, pContext);