diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 4094871bcd..106f192856 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -71,6 +71,9 @@ typedef struct { int32_t refCount; int8_t deployed; int8_t dropped; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; SWorkerPool mgmtPool; SWorkerPool readPool; SWorkerPool writePool; diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 0a764af8dc..fa35e9b573 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -101,7 +101,7 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; int32_t len = 0; - int32_t maxLen = 300; + int32_t maxLen = 4096; char *content = calloc(1, maxLen + 1); cJSON *root = NULL; @@ -139,6 +139,46 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { } pMgmt->dropped = atoi(dropped->valuestring); + cJSON *nodes = cJSON_GetObjectItem(root, "nodes"); + if (!nodes || nodes->type != cJSON_Array) { + dError("failed to read %s since nodes not found", pMgmt->file); + goto PRASE_MNODE_OVER; + } + + pMgmt->replica = cJSON_GetArraySize(nodes); + if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { + dError("failed to read %s since nodes size %d invalid", pMgmt->file, pMgmt->replica); + goto PRASE_MNODE_OVER; + } + + for (int32_t i = 0; i < pMgmt->replica; ++i) { + cJSON *node = cJSON_GetArrayItem(nodes, i); + if (node == NULL) break; + + SReplica *pReplica = &pMgmt->replicas[i]; + + cJSON *id = cJSON_GetObjectItem(node, "id"); + if (!id || id->type != cJSON_String || id->valuestring == NULL) { + dError("failed to read %s since id not found", pMgmt->file); + goto PRASE_MNODE_OVER; + } + pReplica->id = atoi(id->valuestring); + + cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); + if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { + dError("failed to read %s since fqdn not found", pMgmt->file); + goto PRASE_MNODE_OVER; + } + tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); + + cJSON *port = cJSON_GetObjectItem(node, "port"); + if (!port || port->type != cJSON_String || port->valuestring == NULL) { + dError("failed to read %s since port not found", pMgmt->file); + goto PRASE_MNODE_OVER; + } + pReplica->port = atoi(port->valuestring); + } + code = 0; dInfo("succcessed to read file %s", pMgmt->file); @@ -153,7 +193,8 @@ PRASE_MNODE_OVER: static int32_t dndWriteMnodeFile(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - char file[PATH_MAX + 20] = {0}; + + char file[PATH_MAX + 20] = {0}; snprintf(file, sizeof(file), "%s.bak", pMgmt->file); FILE *fp = fopen(file, "w"); @@ -164,12 +205,25 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) { } int32_t len = 0; - int32_t maxLen = 300; + int32_t maxLen = 4096; char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", pMgmt->deployed); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", pMgmt->dropped); + + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, " \"nodes\": [{\n"); + for (int32_t i = 0; i < pMgmt->replica; ++i) { + SReplica *pReplica = &pMgmt->replicas[i]; + len += snprintf(content + len, maxLen - len, " \"id\": \"%d\",\n", pReplica->id); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": \"%u\"\n", pReplica->port); + if (i < pMgmt->replica - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); @@ -269,11 +323,19 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { pReplica->id = 1; pReplica->port = pDnode->opt.serverPort; tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); + + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + pMgmt->selfIndex = pOption->selfIndex; + pMgmt->replica = pOption->replica; + memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) { dndInitMnodeOption(pDnode, pOption); - pOption->replica = 0; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + pOption->selfIndex = pMgmt->selfIndex; + pOption->replica = pMgmt->replica; + memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeMsg *pMsg) { @@ -299,6 +361,10 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC return -1; } + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + pMgmt->selfIndex = pOption->selfIndex; + pMgmt->replica = pOption->replica; + memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); return 0; }