serialize and persist local config

This commit is contained in:
xiao-77 2024-11-04 16:16:24 +08:00
parent bfceaae124
commit 07de144000
7 changed files with 139 additions and 16 deletions

View File

@ -282,6 +282,10 @@ void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);
int8_t taosGranted(int8_t type);
int32_t taosSetSlowLogScope(char *pScopeStr, int32_t *pScope);
int32_t persistLocalConfig(SConfig *pCfg);
int32_t localConfigSerialize(SArray *array, char **serialized);
int32_t tSerializeSConfigArray(void *buf, int32_t bufLen, SArray *array);
int32_t tDeserializeSConfigArray(void *buf, int32_t bufLen, SArray *pReq);
#ifdef __cplusplus
}
#endif

View File

@ -421,7 +421,7 @@ typedef enum ENodeType {
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, // INACTIVE
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, // INACTIVE
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
@ -435,7 +435,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, // INACTIVE
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, // INACTIVE
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
@ -983,7 +983,6 @@ typedef struct SEpSet {
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
int32_t tEncodeSEpSet(SEncoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
@ -4115,20 +4114,20 @@ typedef struct {
SArray* blockTbName;
SArray* blockSchema;
union{
struct{
int64_t sleepTime;
union {
struct {
int64_t sleepTime;
};
struct{
int32_t createTableNum;
SArray* createTableLen;
SArray* createTableReq;
struct {
int32_t createTableNum;
SArray* createTableLen;
SArray* createTableReq;
};
};
} SMqDataRsp;
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pObj);
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteMqDataRsp(SMqDataRsp* pRsp);

View File

@ -17,10 +17,10 @@
#define _TD_MND_H_
#include "monitor.h"
#include "sync.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "trpc.h"
#include "sync.h"
#ifdef __cplusplus
extern "C" {
@ -41,6 +41,12 @@ typedef struct {
int64_t lastIndex;
} SMnodeOpt;
typedef struct {
int32_t version;
SArray *pArray;
TdThreadMutex mutex;
} SMnodeConfig;
/* ------------------------ SMnode ------------------------ */
/**
* @brief Open a mnode.
@ -73,7 +79,7 @@ int32_t mndStart(SMnode *pMnode);
*/
void mndStop(SMnode *pMnode);
int32_t mndIsCatchUp(SMnode *pMnode);
int32_t mndIsCatchUp(SMnode *pMnode);
ESyncRole mndGetRole(SMnode *pMnode);
int64_t mndGetTerm(SMnode *pMnode);
@ -109,7 +115,7 @@ int64_t mndGetRoleTimeMs(SMnode *pMnode);
* @param pMsg The request msg.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo);
int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo *pQueueInfo);
int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg);
void mndPostProcessQueryMsg(SRpcMsg *pMsg);

View File

@ -148,7 +148,8 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump);
void cfgDumpCfgS3(SConfig *pCfg, bool tsc, bool dump);
int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char *apolloUrl);
SArray *cfgGetLocalCfg(SConfig *pCfg);
SArray *cfgGetGlobalCfg(SConfig *pCfg);
#ifdef __cplusplus
}
#endif

View File

@ -14,6 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "cJSON.h"
#include "defines.h"
#include "os.h"
#include "tconfig.h"
@ -27,6 +28,9 @@
#include "cus_name.h"
#endif
#define CONFIG_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define CONFIG_FILE_LEN (CONFIG_PATH_LEN + 32)
// GRANT_CFG_DECLARE;
SConfig *tsCfg = NULL;
@ -1927,7 +1931,11 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
TAOS_CHECK_GOTO(taosSetAllDebugFlag(tsCfg, pItem->i32), &lino, _exit);
cfgDumpCfg(tsCfg, tsc, false);
if (!tsc) {
if ((code = persistLocalConfig(tsCfg)) != 0) {
goto _exit;
}
}
TAOS_CHECK_GOTO(taosCheckGlobalCfg(), &lino, _exit);
_exit:
@ -2473,3 +2481,91 @@ int8_t taosGranted(int8_t type) {
}
return 0;
}
int32_t persistLocalConfig(SConfig *pCfg) {
// TODO: just tmp ,refactor later
int32_t code = 0;
char *buffer = NULL;
TdFilePtr pFile = NULL;
char filepath[CONFIG_FILE_LEN] = {0};
char filename[CONFIG_FILE_LEN] = {0};
snprintf(filepath, sizeof(filepath), "%s%sconfig", tsDataDir, TD_DIRSEP);
snprintf(filename, sizeof(filename), "%s%sconfig%slocal.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
// TODO(beryl) need to check if the file is existed
if (taosMkDir(filepath) != 0) {
code = TAOS_SYSTEM_ERROR(errno);
uError("failed to create dir:%s since %s", filepath, tstrerror(code));
TAOS_RETURN(code);
}
TdFilePtr pConfigFile =
taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pConfigFile == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
uError("failed to open file:%s since %s", filename, tstrerror(code));
TAOS_RETURN(code);
}
char *serialized = NULL;
code = localConfigSerialize(cfgGetLocalCfg(pCfg), &serialized);
if (code != TSDB_CODE_SUCCESS) {
uError("failed to serialize local config since %s", tstrerror(code));
TAOS_RETURN(code);
}
taosWriteFile(pConfigFile, serialized, strlen(serialized));
return TSDB_CODE_SUCCESS;
}
int32_t localConfigSerialize(SArray *array, char **serialized) {
char buf[30];
cJSON *json = cJSON_CreateObject();
if (json == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int sz = taosArrayGetSize(array);
cJSON *cField = cJSON_CreateArray();
if (array == NULL) {
cJSON_Delete(json);
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
// cjson only support int32_t or double
// string are used to prohibit the loss of precision
for (int i = 0; i < sz; i++) {
SConfigItem *item = (SConfigItem *)taosArrayGet(array, i);
switch (item->dtype) {
{
case CFG_DTYPE_NONE:
break;
case CFG_DTYPE_BOOL:
cJSON_AddBoolToObject(cField, item->name, item->bval);
break;
case CFG_DTYPE_INT32:
cJSON_AddNumberToObject(cField, item->name, item->i32);
break;
case CFG_DTYPE_INT64:
(void)sprintf(buf, "%" PRId64, item->i64);
cJSON_AddStringToObject(cField, item->name, buf);
break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
(void)sprintf(buf, "%f", item->fval);
cJSON_AddStringToObject(cField, item->name, buf);
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
cJSON_AddStringToObject(cField, item->name, item->str);
break;
}
}
}
cJSON_AddItemToObject(json, "pArray", cField);
*serialized = cJSON_Print(json);
cJSON_Delete(json);
return TSDB_CODE_SUCCESS;
}

View File

@ -45,6 +45,19 @@ static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
return code;
}
static void mmBuildConfigForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) {
pOption->deploy = true;
pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId;
pOption->selfIndex = 0;
pOption->numOfReplicas = 1;
pOption->numOfTotalReplicas = 1;
pOption->replicas[0].id = 1;
pOption->replicas[0].port = tsServerPort;
tstrncpy(pOption->replicas[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
pOption->lastIndex = SYNC_INDEX_INVALID;
}
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) {
pOption->deploy = true;
pOption->msgCb = pMgmt->msgCb;
@ -120,6 +133,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("mnode start to deploy");
pMgmt->pData->dnodeId = 1;
mmBuildOptionForDeploy(pMgmt, pInput, &option);
} else {
dInfo("mnode start to open");
mmBuildOptionForOpen(pMgmt, &option);

View File

@ -1536,3 +1536,6 @@ void cfgDestroyIter(SConfigIter *pIter) {
taosMemoryFree(pIter);
}
SArray *cfgGetLocalCfg(SConfig *pCfg) { return pCfg->localArray; }
SArray *cfgGetGlobalCfg(SConfig *pCfg) { return pCfg->globalArray; }