Add encode and decode rpc config msg

This commit is contained in:
xiao-77 2024-11-11 11:33:10 +08:00
parent 8355be28d9
commit f931119e14
5 changed files with 131 additions and 24 deletions

View File

@ -42,6 +42,8 @@ typedef enum {
DND_CS_MNODE_WAL = 8, DND_CS_MNODE_WAL = 8,
} EEncryptScope; } EEncryptScope;
extern SConfig *tsCfg;
// cluster // cluster
extern char tsFirst[]; extern char tsFirst[];
extern char tsSecond[]; extern char tsSecond[];
@ -296,8 +298,10 @@ int32_t taosSetSlowLogScope(char *pScopeStr, int32_t *pScope);
int32_t persistGlobalConfig(const char *path, int32_t version); int32_t persistGlobalConfig(const char *path, int32_t version);
int32_t persistLocalConfig(const char *path); int32_t persistLocalConfig(const char *path);
int32_t localConfigSerialize(SArray *array, char **serialized); int32_t localConfigSerialize(SArray *array, char **serialized);
int32_t tSerializeSConfigArray(SEncoder *pEncoder, int32_t bufLen, SArray *array); int32_t tSerializeSConfigArray(SEncoder *pEncoder, SArray *array);
int32_t tDeserializeSConfigArray(SDecoder *pDecoder, int32_t bufLen, SArray *array); int32_t tDeserializeSConfigArray(SDecoder *pDecoder, SArray *array);
int32_t compareSConfigItemArrays(SArray *mArray, const SArray *dArray, SArray *diffArray);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1897,7 +1897,7 @@ typedef struct {
int32_t forceReadConfig; int32_t forceReadConfig;
int32_t isConifgVerified; int32_t isConifgVerified;
int32_t isVersionVerified; int32_t isVersionVerified;
SArray* pArray; SArray* array;
} SConfigRsp; } SConfigRsp;
int32_t tSerializeSConfigRsp(void* buf, int32_t bufLen, SConfigRsp* pRsp); int32_t tSerializeSConfigRsp(void* buf, int32_t bufLen, SConfigRsp* pRsp);

View File

@ -2679,13 +2679,14 @@ int32_t persistLocalConfig(const char *path) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tSerializeSConfigArray(SEncoder *pEncoder, int32_t bufLen, SArray *array) { int32_t tSerializeSConfigArray(SEncoder *pEncoder, SArray *array) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int sz = taosArrayGetSize(array); int sz = taosArrayGetSize(array);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SConfigItem *item = (SConfigItem *)taosArrayGet(array, i); SConfigItem *item = (SConfigItem *)taosArrayGet(array, i);
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, item->name));
switch (item->dtype) { switch (item->dtype) {
{ {
case CFG_DTYPE_NONE: case CFG_DTYPE_NONE:
@ -2717,13 +2718,14 @@ _exit:
return code; return code;
} }
int32_t tDeserializeSConfigArray(SDecoder *pDecoder, int32_t bufLen, SArray *array) { int32_t tDeserializeSConfigArray(SDecoder *pDecoder, SArray *array) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int sz = taosArrayGetSize(array); int sz = taosArrayGetSize(array);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SConfigItem *item = (SConfigItem *)taosArrayGet(array, i); SConfigItem *item = (SConfigItem *)taosArrayGet(array, i);
TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &item->name));
switch (item->dtype) { switch (item->dtype) {
{ {
case CFG_DTYPE_NONE: case CFG_DTYPE_NONE:
@ -2754,3 +2756,51 @@ int32_t tDeserializeSConfigArray(SDecoder *pDecoder, int32_t bufLen, SArray *arr
_exit: _exit:
return code; return code;
} }
bool compareSConfigItem(const SConfigItem *item1, const SConfigItem *item2) {
switch (item1->dtype) {
case CFG_DTYPE_BOOL:
if (item1->bval != item2->bval) return false;
break;
case CFG_DTYPE_FLOAT:
if (item1->fval != item2->fval) return false;
break;
case CFG_DTYPE_INT32:
if (item1->i32 != item2->i32) return false;
break;
case CFG_DTYPE_INT64:
if (item1->i64 != item2->i64) return false;
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
if (strcmp(item1->str, item2->str) != 0) return false;
break;
default:
return false;
}
return true;
}
int32_t compareSConfigItemArrays(SArray *mArray, const SArray *dArray, SArray *diffArray) {
int32_t code = 0;
int32_t msz = taosArrayGetSize(mArray);
int32_t dsz = taosArrayGetSize(dArray);
if (msz != dsz) {
return TSDB_CODE_FAILED;
}
for (int i = 0; i < msz; i++) {
SConfigItem *mItem = (SConfigItem *)taosArrayGet(mArray, i);
SConfigItem *dItem = (SConfigItem *)taosArrayGet(dArray, i);
if (!compareSConfigItem(mItem, dItem)) {
code = TSDB_CODE_FAILED;
taosArrayPush(diffArray, mItem);
}
}
return code;
}

View File

@ -14,6 +14,7 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
#undef TD_MSG_NUMBER_ #undef TD_MSG_NUMBER_
@ -1600,6 +1601,7 @@ _exit:
return code; return code;
} }
void tFreeSStatusReq(SStatusReq *pReq) { taosArrayDestroy(pReq->pVloads); }
int32_t tSerializeSConfigReq(void *buf, int32_t bufLen, SConfigReq *pReq) { int32_t tSerializeSConfigReq(void *buf, int32_t bufLen, SConfigReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
@ -1608,12 +1610,11 @@ int32_t tSerializeSConfigReq(void *buf, int32_t bufLen, SConfigReq *pReq) {
int32_t tlen; int32_t tlen;
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartEncode(&encoder)); TAOS_CHECK_EXIT(tStartEncode(&encoder));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->cver));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->forceReadConfig)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->forceReadConfig));
if (pReq->forceReadConfig) { if (pReq->forceReadConfig) {
} else { TAOS_CHECK_EXIT(tSerializeSConfigArray(&encoder, pReq->array));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->cver));
} }
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->cver));
tEndEncode(&encoder); tEndEncode(&encoder);
_exit: _exit:
if (code) { if (code) {
@ -1632,13 +1633,58 @@ int32_t tDeserializeSConfigReq(void *buf, int32_t bufLen, SConfigReq *pReq) {
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartDecode(&decoder)); TAOS_CHECK_EXIT(tStartDecode(&decoder));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->cver)); TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->cver));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->forceReadConfig));
if (pReq->forceReadConfig) {
TAOS_CHECK_EXIT(tDeserializeSConfigArray(&decoder, pReq->array));
}
tEndDecode(&decoder); tEndDecode(&decoder);
_exit: _exit:
tDecoderClear(&decoder); tDecoderClear(&decoder);
return code; return code;
} }
void tFreeSStatusReq(SStatusReq *pReq) { taosArrayDestroy(pReq->pVloads); } int32_t tSerializeSConfigRsp(void *buf, int32_t bufLen, SConfigRsp *pRsp) {
SEncoder encoder = {0};
int32_t code = 0;
int32_t lino;
int32_t tlen;
tEncoderInit(&encoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartEncode(&encoder));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->forceReadConfig));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->isConifgVerified));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->isVersionVerified));
if ((!pRsp->isConifgVerified) || (!pRsp->isVersionVerified)) {
TAOS_CHECK_EXIT(tSerializeSConfigArray(&encoder, pRsp->array));
}
tEndEncode(&encoder);
_exit:
if (code) {
tlen = code;
} else {
tlen = encoder.pos;
}
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSConfigRsp(void *buf, int32_t bufLen, SConfigRsp *pRsp) {
SDecoder decoder = {0};
int32_t code = 0;
int32_t lino;
tDecoderInit(&decoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartDecode(&decoder));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pRsp->forceReadConfig));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pRsp->isConifgVerified));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pRsp->isVersionVerified));
if ((!pRsp->isConifgVerified) || (!pRsp->isVersionVerified)) {
TAOS_CHECK_EXIT(tDeserializeSConfigArray(&decoder, pRsp->array));
}
_exit:
tEndDecode(&decoder);
tDecoderClear(&decoder);
return code;
}
void tFreeSConfigRsp(SConfigRsp *pRsp);
int32_t tSerializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) { int32_t tSerializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) {
int32_t code = 0, lino = 0; int32_t code = 0, lino = 0;

View File

@ -28,6 +28,7 @@
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "taos_monitor.h" #include "taos_monitor.h"
#include "tconfig.h"
#include "tjson.h" #include "tjson.h"
#include "tmisce.h" #include "tmisce.h"
#include "tunit.h" #include "tunit.h"
@ -930,24 +931,30 @@ static int32_t mndProcessConfigReq(SRpcMsg *pReq) {
int32_t code = -1; int32_t code = -1;
tDeserializeSConfigReq(pReq->pCont, pReq->contLen, &configReq); tDeserializeSConfigReq(pReq->pCont, pReq->contLen, &configReq);
SArray *diffArray = taosArrayInit(16, sizeof(SConfigItem));
SStatusRsp statusRsp = {0}; SConfigRsp configRsp = {0};
statusRsp.statusSeq++; configRsp.forceReadConfig = configReq.forceReadConfig;
statusRsp.dnodeCfg.dnodeId = pDnode->id; if (configRsp.forceReadConfig) {
statusRsp.dnodeCfg.clusterId = pMnode->clusterId; // compare config array from configReq with current config array
statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp)); if (compareSConfigItemArrays(cfgGetGlobalCfg(tsCfg), configReq.array, diffArray)) {
if (statusRsp.pDnodeEps == NULL) { configRsp.array = diffArray;
terrno = TSDB_CODE_OUT_OF_MEMORY; } else {
goto _OVER; configRsp.isConifgVerified = 1;
taosArrayDestroy(diffArray);
}
} else {
configRsp.array = cfgGetGlobalCfg(tsCfg);
if (configReq.cver == tsConfigVersion) {
configRsp.isConifgVerified = 1;
} else {
configRsp.array = cfgGetGlobalCfg(tsCfg);
}
} }
mndGetDnodeEps(pMnode, statusRsp.pDnodeEps); int32_t contLen = tSerializeSConfigRsp(NULL, 0, &configRsp);
statusRsp.ipWhiteVer = pMnode->ipWhiteVer;
int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);
contLen = tSerializeSStatusRsp(pHead, contLen, &statusRsp); contLen = tSerializeSConfigRsp(pHead, contLen, &configRsp);
taosArrayDestroy(statusRsp.pDnodeEps); taosArrayDestroy(diffArray);
if (contLen < 0) { if (contLen < 0) {
code = contLen; code = contLen;
goto _OVER; goto _OVER;