[TD-150] save vnode cfg use json format
This commit is contained in:
parent
273cc32249
commit
8e444435d1
|
@ -298,7 +298,7 @@ static bool dnodeReadMnodeInfos() {
|
|||
tsMnodeInfos.nodeInfos[i].syncPort = (uint16_t)syncPort->valueint;
|
||||
|
||||
cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName");
|
||||
if (!nodeIp || nodeName->type != cJSON_String || nodeName->valuestring == NULL) {
|
||||
if (!nodeName || nodeName->type != cJSON_String || nodeName->valuestring == NULL) {
|
||||
dError("failed to read mnode mgmtIpList.json, nodeName not found");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
@ -310,7 +310,7 @@ static bool dnodeReadMnodeInfos() {
|
|||
dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
|
||||
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
|
||||
dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId,
|
||||
taosIpStr(tsMnodeInfos.nodeInfos[i].nodeId), tsMnodeInfos.nodeInfos[i].nodePort,
|
||||
taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp), tsMnodeInfos.nodeInfos[i].nodePort,
|
||||
tsMnodeInfos.nodeInfos[i].nodeName);
|
||||
}
|
||||
|
||||
|
|
|
@ -130,21 +130,20 @@ static void dnodeCloseVnodes() {
|
|||
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
||||
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
||||
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
||||
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
||||
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
|
||||
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
|
||||
pCreate->cfg.maxCacheSize = htobe64(pCreate->cfg.maxCacheSize);
|
||||
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
|
||||
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
|
||||
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
|
||||
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
|
||||
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
|
||||
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
|
||||
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
|
||||
pCreate->cfg.rowsInFileBlock = htonl(pCreate->cfg.rowsInFileBlock);
|
||||
pCreate->cfg.blocksPerTable = htons(pCreate->cfg.blocksPerTable);
|
||||
pCreate->cfg.cacheNumOfBlocks.totalBlocks = htonl(pCreate->cfg.cacheNumOfBlocks.totalBlocks);
|
||||
pCreate->cfg.arbitratorIp = htonl(pCreate->cfg.arbitratorIp);
|
||||
|
||||
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
|
||||
pCreate->vpeerDesc[j].vgId = htonl(pCreate->vpeerDesc[j].vgId);
|
||||
pCreate->vpeerDesc[j].dnodeId = htonl(pCreate->vpeerDesc[j].dnodeId);
|
||||
pCreate->vpeerDesc[j].ip = htonl(pCreate->vpeerDesc[j].ip);
|
||||
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
|
||||
pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp);
|
||||
}
|
||||
|
||||
return vnodeCreate(pCreate);
|
||||
|
@ -160,8 +159,21 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
|
|||
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
|
||||
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
||||
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
||||
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
||||
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
|
||||
pCreate->cfg.maxCacheSize = htobe64(pCreate->cfg.maxCacheSize);
|
||||
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
|
||||
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
|
||||
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
|
||||
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
|
||||
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
|
||||
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
|
||||
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
|
||||
pCreate->cfg.arbitratorIp = htonl(pCreate->cfg.arbitratorIp);
|
||||
|
||||
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
|
||||
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
|
||||
pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -245,12 +245,6 @@ typedef struct SSchema {
|
|||
int16_t bytes;
|
||||
} SSchema;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t dnodeId;
|
||||
uint32_t ip;
|
||||
} SVnodeDesc;
|
||||
|
||||
typedef struct {
|
||||
int32_t contLen;
|
||||
int32_t vgId;
|
||||
|
@ -521,9 +515,6 @@ typedef struct {
|
|||
uint8_t reserved[5];
|
||||
} SVnodeLoad;
|
||||
|
||||
/*
|
||||
* NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN / 4
|
||||
*/
|
||||
typedef struct {
|
||||
char acct[TSDB_USER_LEN + 1];
|
||||
char db[TSDB_DB_NAME_LEN + 1];
|
||||
|
@ -548,7 +539,7 @@ typedef struct {
|
|||
int8_t loadLatest; // load into mem or not
|
||||
uint8_t precision; // time resolution
|
||||
int8_t reserved[16];
|
||||
} SVnodeCfg, SDbCfg, SCMCreateDbMsg, SCMAlterDbMsg;
|
||||
} SDbCfg, SCMCreateDbMsg, SCMAlterDbMsg;
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_TABLE_ID_LEN + 1];
|
||||
|
@ -614,8 +605,35 @@ typedef struct {
|
|||
} SDMStatusRsp;
|
||||
|
||||
typedef struct {
|
||||
SVnodeCfg cfg;
|
||||
SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS];
|
||||
uint32_t vgId;
|
||||
int32_t maxTables;
|
||||
int64_t maxCacheSize;
|
||||
int32_t minRowsPerFileBlock;
|
||||
int32_t maxRowsPerFileBlock;
|
||||
int32_t daysPerFile;
|
||||
int32_t daysToKeep;
|
||||
int32_t daysToKeep1;
|
||||
int32_t daysToKeep2;
|
||||
int32_t commitTime;
|
||||
uint8_t precision; // time resolution
|
||||
int8_t compression;
|
||||
int8_t wals;
|
||||
int8_t commitLog;
|
||||
int8_t replications;
|
||||
int8_t quorum;
|
||||
uint32_t arbitratorIp;
|
||||
int8_t reserved[16];
|
||||
} SMDVnodeCfg;
|
||||
|
||||
typedef struct {
|
||||
int32_t nodeId;
|
||||
uint32_t nodeIp;
|
||||
char nodeName[TSDB_NODE_NAME_LEN + 1];
|
||||
} SMDVnodeDesc;
|
||||
|
||||
typedef struct {
|
||||
SMDVnodeCfg cfg;
|
||||
SMDVnodeDesc nodes[TSDB_MAX_MPEERS];
|
||||
} SMDCreateVnodeMsg;
|
||||
|
||||
typedef struct {
|
||||
|
@ -673,9 +691,16 @@ typedef struct {
|
|||
int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM];
|
||||
} SSuperTableMetaMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t nodeId;
|
||||
uint32_t nodeIp;
|
||||
uint16_t nodePort;
|
||||
} SVnodeDesc;
|
||||
|
||||
typedef struct {
|
||||
SVnodeDesc vpeerDesc[TSDB_REPLICA_MAX_NUM];
|
||||
int16_t index; // used locally
|
||||
int32_t vgId;
|
||||
int32_t numOfSids;
|
||||
int32_t pSidExtInfoList[]; // offset value of STableIdInfo
|
||||
} SVnodeSidList;
|
||||
|
|
|
@ -444,7 +444,7 @@ static int32_t mgmtDropDnodeByIp(uint32_t ip) {
|
|||
return TSDB_CODE_NO_REMOVE_MASTER;
|
||||
}
|
||||
|
||||
#ifndef _VPEER
|
||||
#ifndef _SYNC
|
||||
return mgmtDropDnode(pDnode);
|
||||
#else
|
||||
return balanceDropDnode(pDnode);
|
||||
|
|
|
@ -506,27 +506,37 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
|
|||
SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg));
|
||||
if (pVnode == NULL) return NULL;
|
||||
|
||||
pVnode->cfg = pDb->cfg;
|
||||
|
||||
SVnodeCfg *pCfg = &pVnode->cfg;
|
||||
SMDVnodeCfg *pCfg = &pVnode->cfg;
|
||||
pCfg->vgId = htonl(pVgroup->vgId);
|
||||
pCfg->maxSessions = htonl(pCfg->maxSessions);
|
||||
pCfg->cacheBlockSize = htonl(pCfg->cacheBlockSize);
|
||||
pCfg->cacheNumOfBlocks.totalBlocks = htonl(pCfg->cacheNumOfBlocks.totalBlocks);
|
||||
pCfg->daysPerFile = htonl(pCfg->daysPerFile);
|
||||
pCfg->daysToKeep1 = htonl(pCfg->daysToKeep1);
|
||||
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
|
||||
pCfg->daysToKeep = htonl(pCfg->daysToKeep);
|
||||
pCfg->commitTime = htonl(pCfg->commitTime);
|
||||
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
|
||||
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
|
||||
pCfg->maxTables = htonl(pDb->cfg.maxSessions);
|
||||
pCfg->maxCacheSize = htobe64((int64_t)pDb->cfg.cacheBlockSize * pDb->cfg.cacheNumOfBlocks.totalBlocks);
|
||||
pCfg->maxCacheSize = htobe64(-1);
|
||||
pCfg->minRowsPerFileBlock = htonl(-1);
|
||||
pCfg->maxRowsPerFileBlock = htonl(-1);
|
||||
pCfg->daysPerFile = htonl(pDb->cfg.daysPerFile);
|
||||
pCfg->daysToKeep1 = htonl(pDb->cfg.daysToKeep1);
|
||||
pCfg->daysToKeep2 = htonl(pDb->cfg.daysToKeep2);
|
||||
pCfg->daysToKeep = htonl(pDb->cfg.daysToKeep);
|
||||
pCfg->daysToKeep = htonl(-1);
|
||||
pCfg->commitTime = htonl(pDb->cfg.commitTime);
|
||||
pCfg->precision = pDb->cfg.precision;
|
||||
pCfg->compression = pDb->cfg.compression;
|
||||
pCfg->compression = -1;
|
||||
pCfg->wals = 3;
|
||||
pCfg->commitLog = pDb->cfg.commitLog;
|
||||
pCfg->replications = (int8_t) pVgroup->numOfVnodes;
|
||||
pCfg->quorum = 1;
|
||||
pCfg->arbitratorIp = htonl(pVgroup->vnodeGid[0].privateIp);
|
||||
|
||||
SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
|
||||
SMDVnodeDesc *pNodes = pVnode->nodes;
|
||||
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
|
||||
vpeerDesc[j].vgId = htonl(pVgroup->vgId);
|
||||
vpeerDesc[j].dnodeId = htonl(pVgroup->vnodeGid[j].dnodeId);
|
||||
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp);
|
||||
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[j].dnodeId);
|
||||
if (pDnode != NULL) {
|
||||
pNodes[j].nodeId = htonl(pDnode->dnodeId);
|
||||
pNodes[j].nodeIp = htonl(pDnode->privateIp);
|
||||
strcpy(pNodes[j].nodeName, pDnode->dnodeName);
|
||||
mgmtReleaseDnode(pDnode);
|
||||
}
|
||||
}
|
||||
|
||||
return pVnode;
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
|
||||
#define TSDB_MIN_ID 0
|
||||
#define TSDB_MAX_ID INT_MAX
|
||||
#define TSDB_MIN_TABLES 10
|
||||
#define TSDB_MIN_TABLES 4
|
||||
#define TSDB_MAX_TABLES 100000
|
||||
#define TSDB_DEFAULT_TABLES 1000
|
||||
#define TSDB_DEFAULT_DAYS_PER_FILE 10
|
||||
|
|
|
@ -9,6 +9,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
|||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(inc)
|
||||
AUX_SOURCE_DIRECTORY(src SRC)
|
||||
|
|
|
@ -18,10 +18,12 @@
|
|||
#include "ihash.h"
|
||||
#include "taoserror.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tutil.h"
|
||||
#include "trpc.h"
|
||||
#include "tsdb.h"
|
||||
#include "ttime.h"
|
||||
#include "ttimer.h"
|
||||
#include "cJSON.h"
|
||||
#include "twal.h"
|
||||
#include "tglobal.h"
|
||||
#include "dnode.h"
|
||||
|
@ -93,21 +95,21 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
|
||||
STsdbCfg tsdbCfg = {0};
|
||||
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
||||
tsdbCfg.compression = -1;
|
||||
tsdbCfg.compression = pVnodeCfg->cfg.compression;;
|
||||
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
|
||||
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
|
||||
tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables;
|
||||
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
|
||||
tsdbCfg.minRowsPerFileBlock = -1;
|
||||
tsdbCfg.maxRowsPerFileBlock = -1;
|
||||
tsdbCfg.keep = -1;
|
||||
tsdbCfg.maxCacheSize = -1;
|
||||
tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
|
||||
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
|
||||
tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep;
|
||||
tsdbCfg.maxCacheSize = pVnodeCfg->cfg.maxCacheSize;
|
||||
|
||||
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
||||
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
|
||||
return terrno;
|
||||
dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
|
||||
return TSDB_CODE_VG_INIT_FAILED;
|
||||
}
|
||||
|
||||
dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog);
|
||||
|
@ -328,88 +330,235 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
|||
}
|
||||
|
||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
|
||||
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||
|
||||
char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
|
||||
sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||
FILE *fp = fopen(cfgFile, "w");
|
||||
if (!fp) return errno;
|
||||
if (!fp) {
|
||||
dError("vgId:%d, failed to open vnode cfg file for write, error:%s", pVnodeCfg->cfg.vgId, strerror(errno));
|
||||
return errno;
|
||||
}
|
||||
|
||||
fprintf(fp, "commitLog %d\n", pVnodeCfg->cfg.commitLog);
|
||||
fprintf(fp, "wals %d\n", 3);
|
||||
fprintf(fp, "arbitratorIp %d\n", pVnodeCfg->vpeerDesc[0].ip);
|
||||
fprintf(fp, "quorum %d\n", 1);
|
||||
fprintf(fp, "replica %d\n", pVnodeCfg->cfg.replications);
|
||||
char ipStr[20];
|
||||
int32_t len = 0;
|
||||
int32_t maxLen = 1000;
|
||||
char * content = calloc(1, maxLen + 1);
|
||||
|
||||
len += snprintf(content + len, maxLen - len, "{\n");
|
||||
|
||||
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
|
||||
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
|
||||
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile);
|
||||
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
|
||||
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep);
|
||||
|
||||
len += snprintf(content + len, maxLen - len, " \"maxCacheSize\": %" PRId64 ",\n", pVnodeCfg->cfg.maxCacheSize);
|
||||
|
||||
len += snprintf(content + len, maxLen - len, " \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog);
|
||||
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
|
||||
|
||||
uint32_t ipInt = pVnodeCfg->cfg.arbitratorIp;
|
||||
sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
|
||||
len += snprintf(content + len, maxLen - len, " \"arbitratorIp\": \"%s\",\n", ipStr);
|
||||
|
||||
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
|
||||
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
|
||||
|
||||
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
|
||||
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
|
||||
fprintf(fp, "index%d nodeId %d nodeIp %u name n%d\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip, pVnodeCfg->vpeerDesc[i].dnodeId);
|
||||
}
|
||||
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);
|
||||
|
||||
uint32_t ipInt = pVnodeCfg->nodes[i].nodeIp;
|
||||
sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
|
||||
len += snprintf(content + len, maxLen - len, " \"nodeIp\": \"%s\",\n", ipStr);
|
||||
|
||||
len += snprintf(content + len, maxLen - len, " \"nodeName\": \"%s\"\n", pVnodeCfg->nodes[i].nodeName);
|
||||
|
||||
if (i < pVnodeCfg->cfg.replications - 1) {
|
||||
len += snprintf(content + len, maxLen - len, " },{\n");
|
||||
} else {
|
||||
len += snprintf(content + len, maxLen - len, " }]\n");
|
||||
}
|
||||
}
|
||||
len += snprintf(content + len, maxLen - len, "}\n");
|
||||
|
||||
fwrite(content, 1, len, fp);
|
||||
fclose(fp);
|
||||
dTrace("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId);
|
||||
free(content);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
dPrint("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// TODO: this is a simple implement
|
||||
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
||||
char option[5][16] = {0};
|
||||
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
|
||||
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId);
|
||||
|
||||
char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
|
||||
sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId);
|
||||
FILE *fp = fopen(cfgFile, "r");
|
||||
if (!fp) return errno;
|
||||
if (!fp) {
|
||||
dError("pVnode:%p vgId:%d, failed to open vnode cfg file for read, error:%s", pVnode, pVnode->vgId, strerror(errno));
|
||||
return errno;
|
||||
}
|
||||
|
||||
int32_t commitLog = -1;
|
||||
int32_t num = fscanf(fp, "%s %d", option[0], &commitLog);
|
||||
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (strcmp(option[0], "commitLog") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (commitLog == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
pVnode->walCfg.commitLog = (int8_t)commitLog;
|
||||
int ret = TSDB_CODE_OTHERS;
|
||||
int maxLen = 1000;
|
||||
char *content = calloc(1, maxLen + 1);
|
||||
int len = fread(content, 1, maxLen, fp);
|
||||
if (len <= 0) {
|
||||
free(content);
|
||||
fclose(fp);
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, content is null", pVnode, pVnode->vgId);
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t wals = -1;
|
||||
num = fscanf(fp, "%s %d", option[0], &wals);
|
||||
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (strcmp(option[0], "wals") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (wals == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
pVnode->walCfg.wals = (int8_t)wals;
|
||||
cJSON *root = cJSON_Parse(content);
|
||||
if (root == NULL) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, invalid json format", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON *precision = cJSON_GetObjectItem(root, "precision");
|
||||
if (!precision || precision->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, precision not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.precision = (int8_t)precision->valueint;
|
||||
|
||||
cJSON *compression = cJSON_GetObjectItem(root, "compression");
|
||||
if (!compression || compression->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, compression not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.compression = (int8_t)compression->valueint;
|
||||
|
||||
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
||||
if (!maxTables || maxTables->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, maxTables not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
||||
|
||||
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
|
||||
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, daysPerFile not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint;
|
||||
|
||||
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
|
||||
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, minRowsPerFileBlock not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint;
|
||||
|
||||
cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
|
||||
if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, maxRowsPerFileBlock not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
|
||||
|
||||
cJSON *keep = cJSON_GetObjectItem(root, "keep");
|
||||
if (!keep || keep->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, keep not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.keep = keep->valueint;
|
||||
|
||||
cJSON *maxCacheSize = cJSON_GetObjectItem(root, "maxCacheSize");
|
||||
if (!maxCacheSize || maxCacheSize->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, maxCacheSize not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.maxCacheSize = maxCacheSize->valueint;
|
||||
|
||||
cJSON *commitLog = cJSON_GetObjectItem(root, "commitLog");
|
||||
if (!commitLog || commitLog->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, commitLog not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->walCfg.commitLog = (int8_t)commitLog->valueint;
|
||||
|
||||
cJSON *wals = cJSON_GetObjectItem(root, "wals");
|
||||
if (!wals || wals->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, wals not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->walCfg.wals = (int8_t)wals->valueint;
|
||||
pVnode->walCfg.keep = 0;
|
||||
|
||||
int32_t arbitratorIp = -1;
|
||||
num = fscanf(fp, "%s %u", option[0], &arbitratorIp);
|
||||
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (strcmp(option[0], "arbitratorIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (arbitratorIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
pVnode->syncCfg.arbitratorIp = arbitratorIp;
|
||||
cJSON *arbitratorIp = cJSON_GetObjectItem(root, "arbitratorIp");
|
||||
if (!arbitratorIp || arbitratorIp->type != cJSON_String || arbitratorIp->valuestring == NULL) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, arbitratorIp not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->syncCfg.arbitratorIp = inet_addr(arbitratorIp->valuestring);
|
||||
|
||||
int32_t quorum = -1;
|
||||
num = fscanf(fp, "%s %d", option[0], &quorum);
|
||||
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (strcmp(option[0], "quorum") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (quorum == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
pVnode->syncCfg.quorum = (int8_t)quorum;
|
||||
cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
|
||||
if (!quorum || quorum->type != cJSON_Number) {
|
||||
dError("failed to vnode cfg, quorum not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->syncCfg.quorum = (int8_t)quorum->valueint;
|
||||
|
||||
int32_t replica = -1;
|
||||
num = fscanf(fp, "%s %d", option[0], &replica);
|
||||
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (strcmp(option[0], "replica") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (replica == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
pVnode->syncCfg.replica = (int8_t)replica;
|
||||
cJSON *replica = cJSON_GetObjectItem(root, "replica");
|
||||
if (!replica || replica->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, replica not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->syncCfg.replica = (int8_t)replica->valueint;
|
||||
|
||||
for (int32_t i = 0; i < replica; ++i) {
|
||||
int32_t dnodeId = -1;
|
||||
uint32_t dnodeIp = -1;
|
||||
num = fscanf(fp, "%s %s %d %s %u %s %s", option[0], option[1], &dnodeId, option[2], &dnodeIp, option[3], pVnode->syncCfg.nodeInfo[i].name);
|
||||
if (num != 7) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (strcmp(option[1], "nodeId") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (strcmp(option[2], "nodeIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (strcmp(option[3], "name") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (dnodeId == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
if (dnodeIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
|
||||
pVnode->syncCfg.nodeInfo[i].nodeId = dnodeId;
|
||||
pVnode->syncCfg.nodeInfo[i].nodeIp = dnodeIp;
|
||||
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
|
||||
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeInfos not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
int size = cJSON_GetArraySize(nodeInfos);
|
||||
if (size != pVnode->syncCfg.replica) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeInfos size not matched", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
for (int i = 0; i < size; ++i) {
|
||||
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
|
||||
if (nodeInfo == NULL) continue;
|
||||
|
||||
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
|
||||
if (!nodeId || nodeId->type != cJSON_Number) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeId not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint;
|
||||
|
||||
cJSON *nodeIp = cJSON_GetObjectItem(nodeInfo, "nodeIp");
|
||||
if (!nodeIp || nodeIp->type != cJSON_String || nodeIp->valuestring == NULL) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeIp not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->syncCfg.nodeInfo[i].nodeIp = inet_addr(nodeIp->valuestring);
|
||||
|
||||
cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName");
|
||||
if (!nodeName || nodeName->type != cJSON_String || nodeName->valuestring == NULL) {
|
||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeName not found", pVnode, pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
strncpy(pVnode->syncCfg.nodeInfo[i].name, nodeName->valuestring, TSDB_NODE_NAME_LEN);
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
|
||||
dPrint("pVnode:%p vgId:%d, read vnode cfg successed, replcia:%d", pVnode, pVnode->vgId, pVnode->syncCfg.replica);
|
||||
for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
|
||||
dPrint("pVnode:%p vgId:%d, dnode:%d, ip:%s name:%s", pVnode, pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId,
|
||||
taosIpStr(pVnode->syncCfg.nodeInfo[i].nodeIp), pVnode->syncCfg.nodeInfo[i].name);
|
||||
}
|
||||
|
||||
PARSE_OVER:
|
||||
free(content);
|
||||
cJSON_Delete(root);
|
||||
fclose(fp);
|
||||
dTrace("pVnode:%p vgId:%d, read vnode cfg successed", pVnode, pVnode->vgId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -93,7 +93,7 @@ echo "privateIp $NODE_IP" >> $TAOS_CFG
|
|||
echo "dDebugFlag 199" >> $TAOS_CFG
|
||||
echo "mDebugFlag 199" >> $TAOS_CFG
|
||||
echo "sdbDebugFlag 199" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 135" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 131" >> $TAOS_CFG
|
||||
echo "tmrDebugFlag 131" >> $TAOS_CFG
|
||||
echo "cDebugFlag 135" >> $TAOS_CFG
|
||||
echo "httpDebugFlag 131" >> $TAOS_CFG
|
||||
|
@ -105,7 +105,7 @@ echo "numOfThreadsPerCore 2.0" >> $TAOS_CFG
|
|||
echo "defaultPass taosdata" >> $TAOS_CFG
|
||||
echo "numOfLogLines 100000000" >> $TAOS_CFG
|
||||
echo "mgmtEqualVnodeNum 0" >> $TAOS_CFG
|
||||
echo "clog 0" >> $TAOS_CFG
|
||||
echo "clog 2" >> $TAOS_CFG
|
||||
echo "statusInterval 1" >> $TAOS_CFG
|
||||
echo "numOfTotalVnodes 4" >> $TAOS_CFG
|
||||
echo "asyncLog 0" >> $TAOS_CFG
|
||||
|
|
|
@ -34,6 +34,8 @@ cd .
|
|||
sh/ip.sh -i 1 -s up > /dev/null 2>&1 &
|
||||
sh/ip.sh -i 2 -s up > /dev/null 2>&1 &
|
||||
sh/ip.sh -i 3 -s up > /dev/null 2>&1 &
|
||||
sh/ip.sh -i 4 -s up > /dev/null 2>&1 &
|
||||
sh/ip.sh -i 5 -s up > /dev/null 2>&1 &
|
||||
|
||||
# Get responsible directories
|
||||
CODE_DIR=`dirname $0`
|
||||
|
|
Loading…
Reference in New Issue