add process config rsp.

This commit is contained in:
xiao-77 2024-11-11 16:52:47 +08:00
parent f931119e14
commit 8a90ebaa38
7 changed files with 145 additions and 21 deletions

View File

@ -295,12 +295,14 @@ void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);
int8_t taosGranted(int8_t type); int8_t taosGranted(int8_t type);
int32_t taosSetSlowLogScope(char *pScopeStr, int32_t *pScope); int32_t taosSetSlowLogScope(char *pScopeStr, int32_t *pScope);
int32_t persistGlobalConfig(const char *path, int32_t version); int32_t persistGlobalConfig(SArray *array, const char *path, int32_t version);
int32_t persistLocalConfig(const char *path); int32_t persistLocalConfig(const char *path);
int32_t localConfigSerialize(SArray *array, char **serialized); int32_t localConfigSerialize(SArray *array, char **serialized);
int32_t tSerializeSConfigArray(SEncoder *pEncoder, SArray *array); int32_t tSerializeSConfigArray(SEncoder *pEncoder, SArray *array);
int32_t tDeserializeSConfigArray(SDecoder *pDecoder, SArray *array); int32_t tDeserializeSConfigArray(SDecoder *pDecoder, SArray *array);
void printConfigNotMatch(SArray *array);
int32_t compareSConfigItemArrays(SArray *mArray, const SArray *dArray, SArray *diffArray); int32_t compareSConfigItemArrays(SArray *mArray, const SArray *dArray, SArray *diffArray);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -1897,6 +1897,7 @@ typedef struct {
int32_t forceReadConfig; int32_t forceReadConfig;
int32_t isConifgVerified; int32_t isConifgVerified;
int32_t isVersionVerified; int32_t isVersionVerified;
int32_t cver;
SArray* array; SArray* array;
} SConfigRsp; } SConfigRsp;

View File

@ -2609,7 +2609,7 @@ int32_t localConfigSerialize(SArray *array, char **serialized) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t persistGlobalConfig(const char *path, int32_t version) { int32_t persistGlobalConfig(SArray *array, const char *path, int32_t version) {
// TODO: just tmp ,refactor later // TODO: just tmp ,refactor later
int32_t code = 0; int32_t code = 0;
char *buffer = NULL; char *buffer = NULL;
@ -2635,7 +2635,7 @@ int32_t persistGlobalConfig(const char *path, int32_t version) {
TAOS_RETURN(code); TAOS_RETURN(code);
} }
char *serialized = NULL; char *serialized = NULL;
code = globalConfigSerialize(version, cfgGetGlobalCfg(tsCfg), &serialized); code = globalConfigSerialize(version, array, &serialized);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("failed to serialize local config since %s", tstrerror(code)); uError("failed to serialize local config since %s", tstrerror(code));
TAOS_RETURN(code); TAOS_RETURN(code);
@ -2682,11 +2682,12 @@ int32_t persistLocalConfig(const char *path) {
int32_t tSerializeSConfigArray(SEncoder *pEncoder, SArray *array) { int32_t tSerializeSConfigArray(SEncoder *pEncoder, SArray *array) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int sz = taosArrayGetSize(array); int32_t sz = taosArrayGetSize(array);
tEncodeI32(pEncoder, sz);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SConfigItem *item = (SConfigItem *)taosArrayGet(array, i); SConfigItem *item = (SConfigItem *)taosArrayGet(array, i);
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, item->name)); TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, item->name));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, item->dtype));
switch (item->dtype) { switch (item->dtype) {
{ {
case CFG_DTYPE_NONE: case CFG_DTYPE_NONE:
@ -2719,36 +2720,38 @@ _exit:
} }
int32_t tDeserializeSConfigArray(SDecoder *pDecoder, SArray *array) { int32_t tDeserializeSConfigArray(SDecoder *pDecoder, SArray *array) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int sz = taosArrayGetSize(array); int32_t sz = 0;
ECfgDataType dtype = CFG_DTYPE_NONE;
tDecodeI32(pDecoder, &sz);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SConfigItem *item = (SConfigItem *)taosArrayGet(array, i); SConfigItem item = {0};
TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &item->name)); TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &item.name));
switch (item->dtype) { TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t *)&dtype));
switch (dtype) {
{ {
case CFG_DTYPE_NONE: case CFG_DTYPE_NONE:
break; break;
case CFG_DTYPE_BOOL: case CFG_DTYPE_BOOL:
TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &item->bval)); TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &item.bval));
break; break;
case CFG_DTYPE_INT32: case CFG_DTYPE_INT32:
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &item->i32)); TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &item.i32));
break; break;
case CFG_DTYPE_INT64: case CFG_DTYPE_INT64:
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &item->i64)); TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &item.i64));
break; break;
case CFG_DTYPE_FLOAT: case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: case CFG_DTYPE_DOUBLE:
TAOS_CHECK_EXIT(tDecodeFloat(pDecoder, &item->fval)); TAOS_CHECK_EXIT(tDecodeFloat(pDecoder, &item.fval));
break; break;
case CFG_DTYPE_STRING: case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR: case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE: case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET: case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE: case CFG_DTYPE_TIMEZONE:
TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &item->str)); TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &item.str));
break; break;
} }
} }
@ -2803,4 +2806,78 @@ int32_t compareSConfigItemArrays(SArray *mArray, const SArray *dArray, SArray *d
} }
return code; return code;
}
void printConfigNotMatch(SArray *array) {
uError(
"The global configuration parameters in the configuration file do not match those in the cluster. Please "
"turn off the forceReadConfigFile option or modify the global configuration parameters that are not "
"configured.");
int32_t sz = taosArrayGetSize(array);
for (int i = 0; i < sz; i++) {
SConfigItem *item = (SConfigItem *)taosArrayGet(array, i);
switch (item->dtype) {
{
case CFG_DTYPE_NONE:
break;
case CFG_DTYPE_BOOL:
uError("config %s in cluster value is:%d", item->name, item->bval);
break;
case CFG_DTYPE_INT32:
uError("config %s in cluster value is:%d", item->name, item->i32);
break;
case CFG_DTYPE_INT64:
uError("config %s in cluster value is:%" PRId64, item->name, item->i64);
break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
uError("config %s in cluster value is:%f", item->name, item->fval);
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
uError("config %s in cluster value is:%s", item->name, item->str);
break;
}
}
}
}
void printConfigNotMatch(SArray *array) {
uError(
"The global configuration parameters in the configuration file do not match those in the cluster. Please "
"turn off the forceReadConfigFile option or modify the global configuration parameters that are not "
"configured.");
int32_t sz = taosArrayGetSize(array);
for (int i = 0; i < sz; i++) {
SConfigItem *item = (SConfigItem *)taosArrayGet(array, i);
switch (item->dtype) {
{
case CFG_DTYPE_NONE:
break;
case CFG_DTYPE_BOOL:
uError("config %s in cluster value is:%d", item->name, item->bval);
break;
case CFG_DTYPE_INT32:
uError("config %s in cluster value is:%d", item->name, item->i32);
break;
case CFG_DTYPE_INT64:
uError("config %s in cluster value is:%" PRId64, item->name, item->i64);
break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
uError("config %s in cluster value is:%f", item->name, item->fval);
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
uError("config %s in cluster value is:%s", item->name, item->str);
break;
}
}
}
} }

View File

@ -1653,6 +1653,7 @@ int32_t tSerializeSConfigRsp(void *buf, int32_t bufLen, SConfigRsp *pRsp) {
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->forceReadConfig)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->forceReadConfig));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->isConifgVerified)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->isConifgVerified));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->isVersionVerified)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->isVersionVerified));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->cver));
if ((!pRsp->isConifgVerified) || (!pRsp->isVersionVerified)) { if ((!pRsp->isConifgVerified) || (!pRsp->isVersionVerified)) {
TAOS_CHECK_EXIT(tSerializeSConfigArray(&encoder, pRsp->array)); TAOS_CHECK_EXIT(tSerializeSConfigArray(&encoder, pRsp->array));
} }
@ -1684,7 +1685,8 @@ _exit:
tDecoderClear(&decoder); tDecoderClear(&decoder);
return code; return code;
} }
void tFreeSConfigRsp(SConfigRsp *pRsp);
void tFreeSConfigRsp(SConfigRsp *pRsp) { taosArrayDestroy(pRsp->array); }
int32_t tSerializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) { int32_t tSerializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) {
int32_t code = 0, lino = 0; int32_t code = 0, lino = 0;

View File

@ -283,6 +283,45 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dmProcessStatusRsp(pMgmt, &rpcRsp); dmProcessStatusRsp(pMgmt, &rpcRsp);
} }
static void dmProcessConfigRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
const STraceId *trace = &pRsp->info.traceId;
SConfigRsp configRsp = {0};
dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
if (pRsp->code != 0) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
pMgmt->statusSeq);
pMgmt->pData->dropped = 1;
if (dmWriteEps(pMgmt->pData) != 0) {
dError("failed to write dnode file");
}
dInfo("dnode will exit since it is in the dropped state");
(void)raise(SIGINT);
}
} else {
if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
tDeserializeSConfigRsp(pRsp->pCont, pRsp->contLen, &configRsp) == 0) {
if (configRsp.forceReadConfig) {
if (configRsp.isConifgVerified) {
persistGlobalConfig(cfgGetGlobalCfg(tsCfg), pMgmt->path, configRsp.cver);
} else {
// log the difference configurations
printConfigNotMatch(configRsp.array);
}
goto _exit;
}
if (!configRsp.isVersionVerified) {
persistGlobalConfig(cfgGetGlobalCfg(tsCfg), pMgmt->path, configRsp.cver);
}
}
}
_exit:
tFreeSConfigRsp(&configRsp);
rpcFreeCont(pRsp->pCont);
}
void dmSendConfigReq(SDnodeMgmt *pMgmt) { void dmSendConfigReq(SDnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
SConfigReq req = {0}; SConfigReq req = {0};
@ -327,10 +366,11 @@ void dmSendConfigReq(SDnodeMgmt *pMgmt) {
dError("failed to send status req since %s", tstrerror(code)); dError("failed to send status req since %s", tstrerror(code));
return; return;
} }
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
} else { dError("failed to send config req since %s", tstrerror(rpcRsp.code));
return;
} }
dmProcessConfigRsp(pMgmt, &rpcRsp);
} }
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) { void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {

View File

@ -45,7 +45,7 @@ static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
return code; return code;
} }
static void mmBuildConfigForDeploy(SMnodeMgmt *pMgmt) { persistGlobalConfig(pMgmt->path, 0); } static void mmBuildConfigForDeploy(SMnodeMgmt *pMgmt) { persistGlobalConfig(cfgGetGlobalCfg(tsCfg), pMgmt->path, 0); }
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) { static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) {
pOption->deploy = true; pOption->deploy = true;

View File

@ -934,6 +934,7 @@ static int32_t mndProcessConfigReq(SRpcMsg *pReq) {
SArray *diffArray = taosArrayInit(16, sizeof(SConfigItem)); SArray *diffArray = taosArrayInit(16, sizeof(SConfigItem));
SConfigRsp configRsp = {0}; SConfigRsp configRsp = {0};
configRsp.forceReadConfig = configReq.forceReadConfig; configRsp.forceReadConfig = configReq.forceReadConfig;
configRsp.cver = tsConfigVersion;
if (configRsp.forceReadConfig) { if (configRsp.forceReadConfig) {
// compare config array from configReq with current config array // compare config array from configReq with current config array
if (compareSConfigItemArrays(cfgGetGlobalCfg(tsCfg), configReq.array, diffArray)) { if (compareSConfigItemArrays(cfgGetGlobalCfg(tsCfg), configReq.array, diffArray)) {
@ -960,6 +961,7 @@ static int32_t mndProcessConfigReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
_OVER: _OVER:
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
return mndUpdClusterInfo(pReq); return mndUpdClusterInfo(pReq);
} }