Enh use tmsg to encode/decode global config.

This commit is contained in:
xiao-77 2024-12-12 17:40:35 +08:00
parent 95a72d14d5
commit d6078fd0b7
3 changed files with 199 additions and 160 deletions

View File

@ -324,13 +324,16 @@ typedef struct {
float fval;
int32_t i32;
int64_t i64;
char str[TSDB_CONFIG_VALUE_LEN];
char* str;
};
} SConfigObj;
int32_t tEncodeSConfigObj(SEncoder* pEncoder, const SConfigObj* pObj);
int32_t tDecodeSConfigObj(SDecoder* pDecoder, SConfigObj* pObj);
SConfigObj* mndInitConfigObj(SConfigItem* pItem);
SConfigObj* mndInitConfigVersion();
int32_t mndUpdateObj(SConfigObj* pObj, const char* name, char* value);
void tFreeSConfigObj(SConfigObj* obj);
typedef struct {
int32_t maxUsers;
@ -505,8 +508,8 @@ typedef struct {
int64_t dstTbUid;
int8_t intervalUnit;
int8_t slidingUnit;
int8_t timezone; // int8_t is not enough, timezone is unit of second
int32_t dstVgId; // for stream
int8_t timezone; // int8_t is not enough, timezone is unit of second
int32_t dstVgId; // for stream
int64_t interval;
int64_t offset;
int64_t sliding;

View File

@ -27,7 +27,6 @@
#define CFG_ALTER_TIMEOUT 3 * 1000
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t optLen, int32_t *pOutValue);
static int32_t cfgUpdateItem(SConfigItem *pItem, SConfigObj *obj);
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp);
@ -66,116 +65,105 @@ int32_t mndInitConfig(SMnode *pMnode) {
SSdbRaw *mnCfgActionEncode(SConfigObj *obj) {
int32_t code = 0;
int32_t lino = 0;
terrno = TSDB_CODE_OUT_OF_MEMORY;
char buf[30];
void *buf = NULL;
int32_t sz = sizeof(SConfigObj) + CFG_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CFG, CFG_VER_NUMBER, sz);
if (pRaw == NULL) goto _OVER;
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
if ((code = tEncodeSConfigObj(&encoder, obj)) < 0) {
tEncoderClear(&encoder);
TSDB_CHECK_CODE(code, lino, _over);
}
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
int32_t size = sizeof(int32_t) + tlen + CFG_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CFG, CFG_VER_NUMBER, size);
TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
buf = taosMemoryMalloc(tlen);
TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeSConfigObj(&encoder, obj)) < 0) {
tEncoderClear(&encoder);
TSDB_CHECK_CODE(code, lino, _over);
}
tEncoderClear(&encoder);
int32_t dataPos = 0;
char name[CFG_NAME_MAX_LEN] = {0};
tstrncpy(name, obj->name, CFG_NAME_MAX_LEN);
SDB_SET_BINARY(pRaw, dataPos, name, CFG_NAME_MAX_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, obj->dtype, _OVER)
switch (obj->dtype) {
case CFG_DTYPE_NONE:
break;
case CFG_DTYPE_BOOL:
SDB_SET_BOOL(pRaw, dataPos, obj->bval, _OVER)
break;
case CFG_DTYPE_INT32:
SDB_SET_INT32(pRaw, dataPos, obj->i32, _OVER);
break;
case CFG_DTYPE_INT64:
SDB_SET_INT64(pRaw, dataPos, obj->i64, _OVER);
break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
SDB_SET_FLOAT(pRaw, dataPos, obj->fval, _OVER)
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
SDB_SET_BINARY(pRaw, dataPos, obj->str, TSDB_CONFIG_VALUE_LEN, _OVER)
break;
}
SDB_SET_RESERVE(pRaw, dataPos, CFG_RESERVE_SIZE, _OVER)
SDB_SET_INT32(pRaw, dataPos, tlen, _over);
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
SDB_SET_DATALEN(pRaw, dataPos, _over);
terrno = 0;
_OVER:
if (terrno != 0) {
mError("cfg failed to encode to raw:%p since %s", pRaw, terrstr());
_over:
taosMemoryFreeClear(buf);
if (code != TSDB_CODE_SUCCESS) {
mError("cfg:%s, failed to encode to raw:%p at line:%d since %s", obj->name, pRaw, lino, tstrerror(code));
sdbFreeRaw(pRaw);
terrno = code;
return NULL;
}
mTrace("cfg encode to raw:%p, row:%p", pRaw, obj);
terrno = 0;
mTrace("cfg:%s, encode to raw:%p, row:%p", obj->name, pRaw, obj);
return pRaw;
}
SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw) {
int32_t code = 0;
int32_t lino = 0;
int32_t len = -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t code = 0;
int32_t lino = 0;
SSdbRow *pRow = NULL;
SConfigObj *obj = NULL;
SConfigObj *pObj = NULL;
void *buf = NULL;
int8_t sver = 0;
int32_t tlen;
int32_t dataPos = 0;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
code = sdbGetRawSoftVer(pRaw, &sver);
TSDB_CHECK_CODE(code, lino, _over);
if (sver != CFG_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
goto _over;
}
pRow = sdbAllocRow(sizeof(SConfigObj));
if (pRow == NULL) goto _OVER;
TSDB_CHECK_NULL(pRow, code, lino, _over, terrno);
obj = sdbGetRowObj(pRow);
if (obj == NULL) goto _OVER;
int32_t dataPos = 0;
pObj = sdbGetRowObj(pRow);
TSDB_CHECK_NULL(pObj, code, lino, _over, terrno);
SDB_GET_BINARY(pRaw, dataPos, obj->name, CFG_NAME_MAX_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, (int32_t *)&obj->dtype, _OVER)
switch (obj->dtype) {
case CFG_DTYPE_NONE:
break;
case CFG_DTYPE_BOOL:
SDB_GET_BOOL(pRaw, dataPos, &obj->bval, _OVER)
break;
case CFG_DTYPE_INT32:
SDB_GET_INT32(pRaw, dataPos, &obj->i32, _OVER);
break;
case CFG_DTYPE_INT64:
SDB_GET_INT64(pRaw, dataPos, &obj->i64, _OVER);
break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
SDB_GET_FLOAT(pRaw, dataPos, &obj->fval, _OVER)
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
SDB_GET_BINARY(pRaw, dataPos, obj->str, TSDB_CONFIG_VALUE_LEN, _OVER)
break;
SDB_GET_INT32(pRaw, dataPos, &tlen, _over);
buf = taosMemoryMalloc(tlen + 1);
TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over);
SDecoder decoder;
tDecoderInit(&decoder, buf, tlen + 1);
code = tDecodeSConfigObj(&decoder, pObj);
tDecoderClear(&decoder);
if (code < 0) {
tFreeSConfigObj(pObj);
}
terrno = TSDB_CODE_SUCCESS;
_OVER:
if (terrno != 0) {
mError("cfg failed to decode from raw:%p since %s", pRaw, terrstr());
_over:
taosMemoryFreeClear(buf);
if (code != TSDB_CODE_SUCCESS) {
mError("cfg:%s, failed to decode from raw:%p since %s at:%d", pObj->name, pRaw, tstrerror(code), lino);
taosMemoryFreeClear(pRow);
terrno = code;
return NULL;
} else {
mTrace("config:%s, decode from raw:%p, row:%p", pObj->name, pRaw, pObj);
terrno = 0;
return pRow;
}
mTrace("cfg decode from raw:%p, row:%p", pRaw, obj);
return pRow;
}
static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigObj *obj) {
@ -211,7 +199,11 @@ static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigObj *pOld, SConfigObj *pNew
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
tstrncpy(pOld->str, pNew->str, TSDB_CONFIG_VALUE_LEN);
taosMemoryFree(pOld->str);
pOld->str = taosStrdup(pNew->str);
if (pOld->str == NULL) {
return terrno;
}
break;
}
return TSDB_CODE_SUCCESS;
@ -313,11 +305,10 @@ int32_t mndInitWriteCfg(SMnode *pMnode) {
SConfigObj *versionObj = mndInitConfigVersion();
if ((code = mndSetCreateConfigCommitLogs(pTrans, versionObj)) != 0) {
mError("failed to init mnd config version, since %s", tstrerror(code));
taosMemoryFree(versionObj->str);
taosMemoryFree(versionObj);
tFreeSConfigObj(versionObj);
goto _OVER;
}
taosMemoryFree(versionObj);
tFreeSConfigObj(versionObj);
sz = taosArrayGetSize(taosGetGlobalCfg(tsCfg));
for (int i = 0; i < sz; ++i) {
@ -329,10 +320,10 @@ int32_t mndInitWriteCfg(SMnode *pMnode) {
}
if ((code = mndSetCreateConfigCommitLogs(pTrans, obj)) != 0) {
mError("failed to init mnd config:%s, since %s", item->name, tstrerror(code));
taosMemoryFree(obj);
tFreeSConfigObj(obj);
goto _OVER;
}
taosMemoryFree(obj);
tFreeSConfigObj(obj);
}
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
@ -412,53 +403,6 @@ int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigObj *item) {
return TSDB_CODE_SUCCESS;
}
int32_t cfgUpdateItem(SConfigItem *pItem, SConfigObj *obj) {
int32_t code = TSDB_CODE_SUCCESS;
if (pItem == NULL || obj == NULL) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
switch (pItem->dtype) {
case CFG_DTYPE_BOOL: {
pItem->bval = obj->bval;
break;
}
case CFG_DTYPE_INT32: {
pItem->i32 = obj->i32;
break;
}
case CFG_DTYPE_INT64: {
pItem->i64 = obj->i64;
break;
}
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: {
pItem->fval = obj->fval;
break;
}
case CFG_DTYPE_DIR:
case CFG_DTYPE_TIMEZONE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_NONE:
case CFG_DTYPE_STRING: {
if (obj->str != NULL) {
taosMemoryFree(pItem->str);
pItem->str = taosStrdup(obj->str);
if (pItem->str == NULL) {
TAOS_RETURN(terrno);
}
}
break;
}
default:
code = TSDB_CODE_INVALID_CFG;
break;
}
TAOS_RETURN(code);
}
static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) {
int32_t code = 0;
char *p = pMCfgReq->config;
@ -682,26 +626,29 @@ _err:
static int32_t mndConfigUpdateTrans(SMnode *pMnode, const char *name, char *pValue, ECfgDataType dtype,
int32_t tsmmConfigVersion) {
int32_t code = -1;
int32_t lino = -1;
SConfigObj pVersion = {0}, pObj = {0};
int32_t code = -1;
int32_t lino = -1;
SConfigObj *pVersion = taosMemoryMalloc(sizeof(SConfigObj)), *pObj = taosMemoryMalloc(sizeof(SConfigObj));
if (pVersion == NULL || pObj == NULL) {
code = terrno;
goto _OVER;
}
tstrncpy(pVersion->name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
pVersion->dtype = CFG_DTYPE_INT32;
pVersion->i32 = tsmmConfigVersion;
tstrncpy(pVersion.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
pVersion.i32 = tsmmConfigVersion;
pVersion.dtype = CFG_DTYPE_INT32;
pObj->dtype = dtype;
tstrncpy(pObj->name, name, CFG_NAME_MAX_LEN);
pObj.dtype = dtype;
tstrncpy(pObj.name, name, CFG_NAME_MAX_LEN);
TAOS_CHECK_GOTO(mndUpdateObj(&pObj, name, pValue), &lino, _OVER);
TAOS_CHECK_GOTO(mndUpdateObj(pObj, name, pValue), &lino, _OVER);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-config");
if (pTrans == NULL) {
if (terrno != 0) code = terrno;
code = terrno;
goto _OVER;
}
mInfo("trans:%d, used to update config:%s to value:%s", pTrans->id, name, pValue);
TAOS_CHECK_GOTO(mndSetCreateConfigCommitLogs(pTrans, &pVersion), &lino, _OVER);
TAOS_CHECK_GOTO(mndSetCreateConfigCommitLogs(pTrans, &pObj), &lino, _OVER);
TAOS_CHECK_GOTO(mndSetCreateConfigCommitLogs(pTrans, pVersion), &lino, _OVER);
TAOS_CHECK_GOTO(mndSetCreateConfigCommitLogs(pTrans, pObj), &lino, _OVER);
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
code = 0;
_OVER:
@ -709,6 +656,8 @@ _OVER:
mError("failed to update config:%s to value:%s, since %s", name, pValue, tstrerror(code));
}
mndTransDrop(pTrans);
tFreeSConfigObj(pVersion);
tFreeSConfigObj(pObj);
return code;
}
@ -809,7 +758,7 @@ static void cfgObjArrayCleanUp(SArray *array) {
int32_t sz = taosArrayGetSize(array);
for (int32_t i = 0; i < sz; ++i) {
SConfigObj *obj = taosArrayGet(array, i);
taosMemoryFree(obj);
tFreeSConfigObj(obj);
}
taosArrayDestroy(array);
}

View File

@ -758,7 +758,11 @@ SConfigObj *mndInitConfigObj(SConfigItem *pItem) {
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
tstrncpy(pObj->str, pItem->str, TSDB_CONFIG_VALUE_LEN);
pObj->str = taosStrdup(pItem->str);
if (pObj->str == NULL) {
taosMemoryFree(pObj);
return NULL;
}
break;
}
return pObj;
@ -802,12 +806,15 @@ int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_STRING: {
strncpy(pObjNew->str, value, strlen(value));
pObjNew->str[strlen(value)] = 0;
pObjNew->str = taosStrdup(value);
if (pObjNew->str == NULL) {
code = terrno;
return code;
}
break;
}
case CFG_DTYPE_NONE:
break;
default:
code = TSDB_CODE_INVALID_CFG;
break;
@ -820,12 +827,92 @@ SConfigObj *mndInitConfigVersion() {
if (pObj == NULL) {
return NULL;
}
strncpy(pObj->name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
tstrncpy(pObj->name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
pObj->dtype = CFG_DTYPE_INT32;
pObj->i32 = 0;
return pObj;
}
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
TAOS_CHECK_RETURN(tStartEncode(pEncoder));
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
switch (pObj->dtype) {
case CFG_DTYPE_BOOL:
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
break;
case CFG_DTYPE_INT32:
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
break;
case CFG_DTYPE_INT64:
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
if (pObj->str != NULL) {
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
} else {
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
}
break;
default:
break;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
TAOS_CHECK_RETURN(tStartDecode(pDecoder));
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
switch (pObj->dtype) {
case CFG_DTYPE_NONE:
break;
case CFG_DTYPE_BOOL:
TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
break;
case CFG_DTYPE_INT32:
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
break;
case CFG_DTYPE_INT64:
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE:
TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
break;
}
tEndDecode(pDecoder);
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
void tFreeSConfigObj(SConfigObj *obj) {
if (obj == NULL) {
return;
}
if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
taosMemoryFreeClear(obj->str);
}
taosMemoryFreeClear(obj);
}
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
// if (pEntryNew == NULL) return NULL;