From 413818a1042ac91e032e264f130dc50361aeefd9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 29 Sep 2021 19:39:40 +0800 Subject: [PATCH] [TD-10430] add mnode epset for dnode module --- source/server/dnode/inc/dnodeMnodeEps.h | 44 ++++ source/server/dnode/src/dnodeMnodeEps.c | 317 ++++++++++++++++++++++++ 2 files changed, 361 insertions(+) create mode 100644 source/server/dnode/inc/dnodeMnodeEps.h create mode 100644 source/server/dnode/src/dnodeMnodeEps.c diff --git a/source/server/dnode/inc/dnodeMnodeEps.h b/source/server/dnode/inc/dnodeMnodeEps.h new file mode 100644 index 0000000000..950fc3783d --- /dev/null +++ b/source/server/dnode/inc/dnodeMnodeEps.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DNODE_MNODE_EP_H_ +#define _TD_DNODE_MNODE_EP_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "dnodeInt.h" + +typedef struct DnMnEps { + Dnode * dnode; + SRpcEpSet mnodeEpSet; + SMInfos mnodeInfos; + char file[PATH_MAX + 20]; + pthread_mutex_t mutex; +} DnMnEps; + +int32_t dnodeInitMnodeEps(Dnode *dnode, DnMnEps **meps); +void dnodeCleanupMnodeEps(DnMnEps **meps); +void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *pMinfos); +void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *pEpSet); +void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet); +void dnodeGetEpSetForShell(DnMnEps *meps, SRpcEpSet *epSet); +void dnodeSendRedirectMsg(Dnode *dnode, SRpcMsg *rpcMsg, bool forShell); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DNODE_MNODE_EP_H_*/ diff --git a/source/server/dnode/src/dnodeMnodeEps.c b/source/server/dnode/src/dnodeMnodeEps.c new file mode 100644 index 0000000000..ba926b0094 --- /dev/null +++ b/source/server/dnode/src/dnodeMnodeEps.c @@ -0,0 +1,317 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "cJSON.h" +#include "tglobal.h" +#include "dnodeCfg.h" +#include "dnodeEps.h" +#include "dnodeMnodeEps.h" +#include "mnode.h" + +static void dnodePrintMnodeEps(DnMnEps *meps) { + SRpcEpSet *epset = &meps->mnodeEpSet; + dInfo("print mnode eps, num:%d inuse:%d", epset->numOfEps, epset->inUse); + for (int32_t i = 0; i < epset->numOfEps; i++) { + dInfo("ep index:%d, %s:%u", i, epset->fqdn[i], epset->port[i]); + } +} + +static void dnodeResetMnodeEps(DnMnEps *meps, SMInfos *mInfos) { + if (mInfos == NULL || mInfos->mnodeNum == 0) { + meps->mnodeEpSet.numOfEps = 1; + taosGetFqdnPortFromEp(tsFirst, meps->mnodeEpSet.fqdn[0], &meps->mnodeEpSet.port[0]); + + if (strcmp(tsSecond, tsFirst) != 0) { + meps->mnodeEpSet.numOfEps = 2; + taosGetFqdnPortFromEp(tsSecond, meps->mnodeEpSet.fqdn[1], &meps->mnodeEpSet.port[1]); + } + dnodePrintMnodeEps(meps); + return; + } + + int32_t size = sizeof(SMInfos); + memcpy(&meps->mnodeInfos, mInfos, size); + + meps->mnodeEpSet.inUse = meps->mnodeInfos.inUse; + meps->mnodeEpSet.numOfEps = meps->mnodeInfos.mnodeNum; + for (int32_t i = 0; i < meps->mnodeInfos.mnodeNum; i++) { + taosGetFqdnPortFromEp(meps->mnodeInfos.mnodeInfos[i].mnodeEp, meps->mnodeEpSet.fqdn[i], &meps->mnodeEpSet.port[i]); + } + + dnodePrintMnodeEps(meps); +} + +static int32_t dnodeWriteMnodeEps(DnMnEps *meps) { + FILE *fp = fopen(meps->file, "w"); + if (!fp) { + dError("failed to write %s since %s", meps->file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 2000; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", meps->mnodeInfos.inUse); + len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", meps->mnodeInfos.mnodeNum); + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); + for (int32_t i = 0; i < meps->mnodeInfos.mnodeNum; i++) { + len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", meps->mnodeInfos.mnodeInfos[i].mnodeId); + len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", meps->mnodeInfos.mnodeInfos[i].mnodeEp); + if (i < meps->mnodeInfos.mnodeNum - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsync(fileno(fp)); + fclose(fp); + free(content); + terrno = 0; + + dInfo("successed to write %s", meps->file); + return 0; +} + +static int32_t dnodeReadMnodeEps(DnMnEps *meps, DnEps *deps) { + int32_t len = 0; + int32_t maxLen = 2000; + char * content = calloc(1, maxLen + 1); + cJSON * root = NULL; + FILE * fp = NULL; + SMInfos mInfos = {0}; + bool nodeChanged = false; + + fp = fopen(meps->file, "r"); + if (!fp) { + dDebug("file %s not exist", meps->file); + goto PARSE_MINFOS_OVER; + } + + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", meps->file); + goto PARSE_MINFOS_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", meps->file); + goto PARSE_MINFOS_OVER; + } + + cJSON *inUse = cJSON_GetObjectItem(root, "inUse"); + if (!inUse || inUse->type != cJSON_Number) { + dError("failed to read mnodeEpSet.json since inUse not found"); + goto PARSE_MINFOS_OVER; + } + meps->mnodeInfos.inUse = (int8_t)inUse->valueint; + + cJSON *nodeNum = cJSON_GetObjectItem(root, "nodeNum"); + if (!nodeNum || nodeNum->type != cJSON_Number) { + dError("failed to read mnodeEpSet.json since nodeNum not found"); + goto PARSE_MINFOS_OVER; + } + mInfos.mnodeNum = (int8_t)nodeNum->valueint; + + cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); + if (!nodeInfos || nodeInfos->type != cJSON_Array) { + dError("failed to read mnodeEpSet.json since nodeInfos not found"); + goto PARSE_MINFOS_OVER; + } + + int32_t size = cJSON_GetArraySize(nodeInfos); + if (size != mInfos.mnodeNum) { + dError("failed to read mnodeEpSet.json since nodeInfos size not matched"); + goto PARSE_MINFOS_OVER; + } + + for (int32_t i = 0; i < size; ++i) { + cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); + if (nodeInfo == NULL) continue; + + cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); + if (!nodeId || nodeId->type != cJSON_Number) { + dError("failed to read mnodeEpSet.json since nodeId not found"); + goto PARSE_MINFOS_OVER; + } + + cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); + if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { + dError("failed to read mnodeEpSet.json since nodeName not found"); + goto PARSE_MINFOS_OVER; + } + + SMInfo *mInfo = &mInfos.mnodeInfos[i]; + mInfo->mnodeId = (int32_t)nodeId->valueint; + tstrncpy(mInfo->mnodeEp, nodeEp->valuestring, TSDB_EP_LEN); + + bool changed = dnodeIsDnodeEpChanged(deps, mInfo->mnodeId, mInfo->mnodeEp); + if (changed) nodeChanged = changed; + } + + dInfo("successed to read file %s", meps->file); + +PARSE_MINFOS_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + terrno = 0; + + for (int32_t i = 0; i < mInfos.mnodeNum; ++i) { + SMInfo *mInfo = &mInfos.mnodeInfos[i]; + dnodeGetDnodeEp(meps->dnode, mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL); + } + + dnodeResetMnodeEps(meps, &mInfos); + + if (nodeChanged) { + dnodeWriteMnodeEps(meps); + } + + return 0; +} + +void dnodeSendRedirectMsg(struct Dnode *dnode, SRpcMsg *rpcMsg, bool forShell) { + DnMnEps *meps = dnode->meps; + SRpcConnInfo connInfo = {0}; + rpcGetConnInfo(rpcMsg->handle, &connInfo); + + SRpcEpSet epSet = {0}; + if (forShell) { + dnodeGetEpSetForShell(meps, &epSet); + } else { + dnodeGetEpSetForPeer(meps, &epSet); + } + + dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType], + taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse); + + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]); + if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) { + if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) || + (epSet.port[i] == tsServerPort && forShell)) { + epSet.inUse = (i + 1) % epSet.numOfEps; + dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse); + } + } + + epSet.port[i] = htons(epSet.port[i]); + } + + rpcSendRedirectRsp(rpcMsg->handle, &epSet); +} + +int32_t dnodeInitMnodeEps(Dnode *dnode, DnMnEps **out) { + DnMnEps *meps = calloc(1, sizeof(DnMnEps)); + if (meps == NULL) return -1; + + meps->dnode = dnode; + snprintf(meps->file, sizeof(meps->file), "%s/mnodeEpSet.json", tsDnodeDir); + pthread_mutex_init(&meps->mutex, NULL); + *out = meps; + + dnodeResetMnodeEps(meps, NULL); + int32_t ret = dnodeReadMnodeEps(meps, dnode->eps); + if (ret == 0) { + dInfo("dnode mInfos is initialized"); + } + + return ret; +} + +void dnodeCleanupMnodeEps(DnMnEps **out) { + DnMnEps *meps = *out; + *out = NULL; + + if (meps != NULL) { + pthread_mutex_destroy(&meps->mutex); + free(meps); + } +} + +void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *mInfos) { + if (mInfos->mnodeNum <= 0 || mInfos->mnodeNum > TSDB_MAX_REPLICA) { + dError("invalid mInfos since num:%d invalid", mInfos->mnodeNum); + return; + } + + for (int32_t i = 0; i < mInfos->mnodeNum; ++i) { + SMInfo *minfo = &mInfos->mnodeInfos[i]; + minfo->mnodeId = htonl(minfo->mnodeId); + if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) { + dError("invalid mInfo:%d since id:%d and ep:%s invalid", i, minfo->mnodeId, minfo->mnodeEp); + return; + } + } + + pthread_mutex_lock(&meps->mutex); + if (mInfos->mnodeNum != meps->mnodeInfos.mnodeNum) { + dnodeResetMnodeEps(meps, mInfos); + dnodeWriteMnodeEps(meps); + } else { + int32_t size = sizeof(SMInfos); + if (memcmp(mInfos, &meps->mnodeInfos, size) != 0) { + dnodeResetMnodeEps(meps, mInfos); + dnodeWriteMnodeEps(meps); + } + } + pthread_mutex_unlock(&meps->mutex); +} + +void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *ep) { + if (ep->numOfEps <= 0) { + dError("mInfos is changed, but content is invalid, discard it"); + return; + } + + pthread_mutex_lock(&meps->mutex); + + dInfo("mInfos is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse); + for (int32_t i = 0; i < ep->numOfEps; ++i) { + ep->port[i] -= TSDB_PORT_DNODEDNODE; + dInfo("minfo:%d %s:%u", i, ep->fqdn[i], ep->port[i]); + } + meps->mnodeEpSet = *ep; + + pthread_mutex_unlock(&meps->mutex); +} + +void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet) { + pthread_mutex_lock(&meps->mutex); + + *epSet = meps->mnodeEpSet; + for (int32_t i = 0; i < epSet->numOfEps; ++i) { + epSet->port[i] += TSDB_PORT_DNODEDNODE; + } + + pthread_mutex_unlock(&meps->mutex); +} + +void dnodeGetEpSetForShell(DnMnEps *meps, SRpcEpSet *epSet) { + pthread_mutex_lock(&meps->mutex); + + *epSet = meps->mnodeEpSet; + + pthread_mutex_unlock(&meps->mutex); +}