From d6078fd0b7e41ecb1603113017cf81ed375f43a4 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Thu, 12 Dec 2024 17:40:35 +0800 Subject: [PATCH] Enh use tmsg to encode/decode global config. --- source/dnode/mnode/impl/inc/mndDef.h | 9 +- source/dnode/mnode/impl/src/mndConfig.c | 253 ++++++++++-------------- source/dnode/mnode/impl/src/mndDef.c | 97 ++++++++- 3 files changed, 199 insertions(+), 160 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index d2ab5d9687..0656bf8dc2 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -324,13 +324,16 @@ typedef struct { float fval; int32_t i32; int64_t i64; - char str[TSDB_CONFIG_VALUE_LEN]; + char* str; }; } SConfigObj; +int32_t tEncodeSConfigObj(SEncoder* pEncoder, const SConfigObj* pObj); +int32_t tDecodeSConfigObj(SDecoder* pDecoder, SConfigObj* pObj); SConfigObj* mndInitConfigObj(SConfigItem* pItem); SConfigObj* mndInitConfigVersion(); int32_t mndUpdateObj(SConfigObj* pObj, const char* name, char* value); +void tFreeSConfigObj(SConfigObj* obj); typedef struct { int32_t maxUsers; @@ -505,8 +508,8 @@ typedef struct { int64_t dstTbUid; int8_t intervalUnit; int8_t slidingUnit; - int8_t timezone; // int8_t is not enough, timezone is unit of second - int32_t dstVgId; // for stream + int8_t timezone; // int8_t is not enough, timezone is unit of second + int32_t dstVgId; // for stream int64_t interval; int64_t offset; int64_t sliding; diff --git a/source/dnode/mnode/impl/src/mndConfig.c b/source/dnode/mnode/impl/src/mndConfig.c index a18e524581..84a9110bc5 100644 --- a/source/dnode/mnode/impl/src/mndConfig.c +++ b/source/dnode/mnode/impl/src/mndConfig.c @@ -27,7 +27,6 @@ #define CFG_ALTER_TIMEOUT 3 * 1000 static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t optLen, int32_t *pOutValue); -static int32_t cfgUpdateItem(SConfigItem *pItem, SConfigObj *obj); static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq); static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq); static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp); @@ -66,116 +65,105 @@ int32_t mndInitConfig(SMnode *pMnode) { SSdbRaw *mnCfgActionEncode(SConfigObj *obj) { int32_t code = 0; int32_t lino = 0; - terrno = TSDB_CODE_OUT_OF_MEMORY; - char buf[30]; + void *buf = NULL; - int32_t sz = sizeof(SConfigObj) + CFG_RESERVE_SIZE; - SSdbRaw *pRaw = sdbAllocRaw(SDB_CFG, CFG_VER_NUMBER, sz); - if (pRaw == NULL) goto _OVER; + SEncoder encoder; + tEncoderInit(&encoder, NULL, 0); + if ((code = tEncodeSConfigObj(&encoder, obj)) < 0) { + tEncoderClear(&encoder); + TSDB_CHECK_CODE(code, lino, _over); + } + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + int32_t size = sizeof(int32_t) + tlen + CFG_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_CFG, CFG_VER_NUMBER, size); + TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno); + + buf = taosMemoryMalloc(tlen); + TSDB_CHECK_NULL(buf, code, lino, _over, terrno); + + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeSConfigObj(&encoder, obj)) < 0) { + tEncoderClear(&encoder); + TSDB_CHECK_CODE(code, lino, _over); + } + + tEncoderClear(&encoder); int32_t dataPos = 0; - char name[CFG_NAME_MAX_LEN] = {0}; - tstrncpy(name, obj->name, CFG_NAME_MAX_LEN); - SDB_SET_BINARY(pRaw, dataPos, name, CFG_NAME_MAX_LEN, _OVER) - SDB_SET_INT32(pRaw, dataPos, obj->dtype, _OVER) - switch (obj->dtype) { - case CFG_DTYPE_NONE: - break; - case CFG_DTYPE_BOOL: - SDB_SET_BOOL(pRaw, dataPos, obj->bval, _OVER) - break; - case CFG_DTYPE_INT32: - SDB_SET_INT32(pRaw, dataPos, obj->i32, _OVER); - break; - case CFG_DTYPE_INT64: - SDB_SET_INT64(pRaw, dataPos, obj->i64, _OVER); - break; - case CFG_DTYPE_FLOAT: - case CFG_DTYPE_DOUBLE: - SDB_SET_FLOAT(pRaw, dataPos, obj->fval, _OVER) - break; - case CFG_DTYPE_STRING: - case CFG_DTYPE_DIR: - case CFG_DTYPE_LOCALE: - case CFG_DTYPE_CHARSET: - case CFG_DTYPE_TIMEZONE: - SDB_SET_BINARY(pRaw, dataPos, obj->str, TSDB_CONFIG_VALUE_LEN, _OVER) - break; - } - SDB_SET_RESERVE(pRaw, dataPos, CFG_RESERVE_SIZE, _OVER) + SDB_SET_INT32(pRaw, dataPos, tlen, _over); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over); + SDB_SET_DATALEN(pRaw, dataPos, _over); - terrno = 0; - -_OVER: - if (terrno != 0) { - mError("cfg failed to encode to raw:%p since %s", pRaw, terrstr()); +_over: + taosMemoryFreeClear(buf); + if (code != TSDB_CODE_SUCCESS) { + mError("cfg:%s, failed to encode to raw:%p at line:%d since %s", obj->name, pRaw, lino, tstrerror(code)); sdbFreeRaw(pRaw); + terrno = code; return NULL; } - mTrace("cfg encode to raw:%p, row:%p", pRaw, obj); + + terrno = 0; + mTrace("cfg:%s, encode to raw:%p, row:%p", obj->name, pRaw, obj); return pRaw; } SSdbRow *mndCfgActionDecode(SSdbRaw *pRaw) { - int32_t code = 0; - int32_t lino = 0; - int32_t len = -1; - terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t code = 0; + int32_t lino = 0; SSdbRow *pRow = NULL; - SConfigObj *obj = NULL; + SConfigObj *pObj = NULL; + void *buf = NULL; + int8_t sver = 0; + int32_t tlen; + int32_t dataPos = 0; - int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; + code = sdbGetRawSoftVer(pRaw, &sver); + TSDB_CHECK_CODE(code, lino, _over); if (sver != CFG_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - goto _OVER; + goto _over; } pRow = sdbAllocRow(sizeof(SConfigObj)); - if (pRow == NULL) goto _OVER; + TSDB_CHECK_NULL(pRow, code, lino, _over, terrno); - obj = sdbGetRowObj(pRow); - if (obj == NULL) goto _OVER; - int32_t dataPos = 0; + pObj = sdbGetRowObj(pRow); + TSDB_CHECK_NULL(pObj, code, lino, _over, terrno); - SDB_GET_BINARY(pRaw, dataPos, obj->name, CFG_NAME_MAX_LEN, _OVER) - SDB_GET_INT32(pRaw, dataPos, (int32_t *)&obj->dtype, _OVER) - switch (obj->dtype) { - case CFG_DTYPE_NONE: - break; - case CFG_DTYPE_BOOL: - SDB_GET_BOOL(pRaw, dataPos, &obj->bval, _OVER) - break; - case CFG_DTYPE_INT32: - SDB_GET_INT32(pRaw, dataPos, &obj->i32, _OVER); - break; - case CFG_DTYPE_INT64: - SDB_GET_INT64(pRaw, dataPos, &obj->i64, _OVER); - break; - case CFG_DTYPE_FLOAT: - case CFG_DTYPE_DOUBLE: - SDB_GET_FLOAT(pRaw, dataPos, &obj->fval, _OVER) - break; - case CFG_DTYPE_STRING: - case CFG_DTYPE_DIR: - case CFG_DTYPE_LOCALE: - case CFG_DTYPE_CHARSET: - case CFG_DTYPE_TIMEZONE: - SDB_GET_BINARY(pRaw, dataPos, obj->str, TSDB_CONFIG_VALUE_LEN, _OVER) - break; + SDB_GET_INT32(pRaw, dataPos, &tlen, _over); + + buf = taosMemoryMalloc(tlen + 1); + TSDB_CHECK_NULL(buf, code, lino, _over, terrno); + + SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over); + + SDecoder decoder; + tDecoderInit(&decoder, buf, tlen + 1); + code = tDecodeSConfigObj(&decoder, pObj); + tDecoderClear(&decoder); + + if (code < 0) { + tFreeSConfigObj(pObj); } - terrno = TSDB_CODE_SUCCESS; -_OVER: - if (terrno != 0) { - mError("cfg failed to decode from raw:%p since %s", pRaw, terrstr()); +_over: + taosMemoryFreeClear(buf); + + if (code != TSDB_CODE_SUCCESS) { + mError("cfg:%s, failed to decode from raw:%p since %s at:%d", pObj->name, pRaw, tstrerror(code), lino); taosMemoryFreeClear(pRow); + terrno = code; return NULL; + } else { + mTrace("config:%s, decode from raw:%p, row:%p", pObj->name, pRaw, pObj); + terrno = 0; + return pRow; } - - mTrace("cfg decode from raw:%p, row:%p", pRaw, obj); - return pRow; } static int32_t mndCfgActionInsert(SSdb *pSdb, SConfigObj *obj) { @@ -211,7 +199,11 @@ static int32_t mndCfgActionUpdate(SSdb *pSdb, SConfigObj *pOld, SConfigObj *pNew case CFG_DTYPE_LOCALE: case CFG_DTYPE_CHARSET: case CFG_DTYPE_TIMEZONE: - tstrncpy(pOld->str, pNew->str, TSDB_CONFIG_VALUE_LEN); + taosMemoryFree(pOld->str); + pOld->str = taosStrdup(pNew->str); + if (pOld->str == NULL) { + return terrno; + } break; } return TSDB_CODE_SUCCESS; @@ -313,11 +305,10 @@ int32_t mndInitWriteCfg(SMnode *pMnode) { SConfigObj *versionObj = mndInitConfigVersion(); if ((code = mndSetCreateConfigCommitLogs(pTrans, versionObj)) != 0) { mError("failed to init mnd config version, since %s", tstrerror(code)); - taosMemoryFree(versionObj->str); - taosMemoryFree(versionObj); + tFreeSConfigObj(versionObj); goto _OVER; } - taosMemoryFree(versionObj); + tFreeSConfigObj(versionObj); sz = taosArrayGetSize(taosGetGlobalCfg(tsCfg)); for (int i = 0; i < sz; ++i) { @@ -329,10 +320,10 @@ int32_t mndInitWriteCfg(SMnode *pMnode) { } if ((code = mndSetCreateConfigCommitLogs(pTrans, obj)) != 0) { mError("failed to init mnd config:%s, since %s", item->name, tstrerror(code)); - taosMemoryFree(obj); + tFreeSConfigObj(obj); goto _OVER; } - taosMemoryFree(obj); + tFreeSConfigObj(obj); } if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER; @@ -412,53 +403,6 @@ int32_t mndSetCreateConfigCommitLogs(STrans *pTrans, SConfigObj *item) { return TSDB_CODE_SUCCESS; } -int32_t cfgUpdateItem(SConfigItem *pItem, SConfigObj *obj) { - int32_t code = TSDB_CODE_SUCCESS; - if (pItem == NULL || obj == NULL) { - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - } - - switch (pItem->dtype) { - case CFG_DTYPE_BOOL: { - pItem->bval = obj->bval; - break; - } - case CFG_DTYPE_INT32: { - pItem->i32 = obj->i32; - break; - } - case CFG_DTYPE_INT64: { - pItem->i64 = obj->i64; - break; - } - case CFG_DTYPE_FLOAT: - case CFG_DTYPE_DOUBLE: { - pItem->fval = obj->fval; - break; - } - case CFG_DTYPE_DIR: - case CFG_DTYPE_TIMEZONE: - case CFG_DTYPE_CHARSET: - case CFG_DTYPE_LOCALE: - case CFG_DTYPE_NONE: - case CFG_DTYPE_STRING: { - if (obj->str != NULL) { - taosMemoryFree(pItem->str); - pItem->str = taosStrdup(obj->str); - if (pItem->str == NULL) { - TAOS_RETURN(terrno); - } - } - break; - } - default: - code = TSDB_CODE_INVALID_CFG; - break; - } - - TAOS_RETURN(code); -} - static int32_t mndMCfg2DCfg(SMCfgDnodeReq *pMCfgReq, SDCfgDnodeReq *pDCfgReq) { int32_t code = 0; char *p = pMCfgReq->config; @@ -682,26 +626,29 @@ _err: static int32_t mndConfigUpdateTrans(SMnode *pMnode, const char *name, char *pValue, ECfgDataType dtype, int32_t tsmmConfigVersion) { - int32_t code = -1; - int32_t lino = -1; - SConfigObj pVersion = {0}, pObj = {0}; + int32_t code = -1; + int32_t lino = -1; + SConfigObj *pVersion = taosMemoryMalloc(sizeof(SConfigObj)), *pObj = taosMemoryMalloc(sizeof(SConfigObj)); + if (pVersion == NULL || pObj == NULL) { + code = terrno; + goto _OVER; + } + tstrncpy(pVersion->name, "tsmmConfigVersion", CFG_NAME_MAX_LEN); + pVersion->dtype = CFG_DTYPE_INT32; + pVersion->i32 = tsmmConfigVersion; - tstrncpy(pVersion.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN); - pVersion.i32 = tsmmConfigVersion; - pVersion.dtype = CFG_DTYPE_INT32; + pObj->dtype = dtype; + tstrncpy(pObj->name, name, CFG_NAME_MAX_LEN); - pObj.dtype = dtype; - tstrncpy(pObj.name, name, CFG_NAME_MAX_LEN); - - TAOS_CHECK_GOTO(mndUpdateObj(&pObj, name, pValue), &lino, _OVER); + TAOS_CHECK_GOTO(mndUpdateObj(pObj, name, pValue), &lino, _OVER); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-config"); if (pTrans == NULL) { - if (terrno != 0) code = terrno; + code = terrno; goto _OVER; } mInfo("trans:%d, used to update config:%s to value:%s", pTrans->id, name, pValue); - TAOS_CHECK_GOTO(mndSetCreateConfigCommitLogs(pTrans, &pVersion), &lino, _OVER); - TAOS_CHECK_GOTO(mndSetCreateConfigCommitLogs(pTrans, &pObj), &lino, _OVER); + TAOS_CHECK_GOTO(mndSetCreateConfigCommitLogs(pTrans, pVersion), &lino, _OVER); + TAOS_CHECK_GOTO(mndSetCreateConfigCommitLogs(pTrans, pObj), &lino, _OVER); if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER; code = 0; _OVER: @@ -709,6 +656,8 @@ _OVER: mError("failed to update config:%s to value:%s, since %s", name, pValue, tstrerror(code)); } mndTransDrop(pTrans); + tFreeSConfigObj(pVersion); + tFreeSConfigObj(pObj); return code; } @@ -809,7 +758,7 @@ static void cfgObjArrayCleanUp(SArray *array) { int32_t sz = taosArrayGetSize(array); for (int32_t i = 0; i < sz; ++i) { SConfigObj *obj = taosArrayGet(array, i); - taosMemoryFree(obj); + tFreeSConfigObj(obj); } taosArrayDestroy(array); } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6d66addd6e..1cc547c585 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -758,7 +758,11 @@ SConfigObj *mndInitConfigObj(SConfigItem *pItem) { case CFG_DTYPE_LOCALE: case CFG_DTYPE_CHARSET: case CFG_DTYPE_TIMEZONE: - tstrncpy(pObj->str, pItem->str, TSDB_CONFIG_VALUE_LEN); + pObj->str = taosStrdup(pItem->str); + if (pObj->str == NULL) { + taosMemoryFree(pObj); + return NULL; + } break; } return pObj; @@ -802,12 +806,15 @@ int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) { case CFG_DTYPE_CHARSET: case CFG_DTYPE_LOCALE: case CFG_DTYPE_STRING: { - strncpy(pObjNew->str, value, strlen(value)); - pObjNew->str[strlen(value)] = 0; + pObjNew->str = taosStrdup(value); + if (pObjNew->str == NULL) { + code = terrno; + return code; + } break; } - case CFG_DTYPE_NONE: + break; default: code = TSDB_CODE_INVALID_CFG; break; @@ -820,12 +827,92 @@ SConfigObj *mndInitConfigVersion() { if (pObj == NULL) { return NULL; } - strncpy(pObj->name, "tsmmConfigVersion", CFG_NAME_MAX_LEN); + tstrncpy(pObj->name, "tsmmConfigVersion", CFG_NAME_MAX_LEN); pObj->dtype = CFG_DTYPE_INT32; pObj->i32 = 0; return pObj; } +int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) { + TAOS_CHECK_RETURN(tStartEncode(pEncoder)); + TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name)); + + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype)); + switch (pObj->dtype) { + case CFG_DTYPE_BOOL: + TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval)); + break; + case CFG_DTYPE_INT32: + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32)); + break; + case CFG_DTYPE_INT64: + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64)); + break; + case CFG_DTYPE_FLOAT: + case CFG_DTYPE_DOUBLE: + TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval)); + break; + case CFG_DTYPE_STRING: + case CFG_DTYPE_DIR: + case CFG_DTYPE_LOCALE: + case CFG_DTYPE_CHARSET: + case CFG_DTYPE_TIMEZONE: + if (pObj->str != NULL) { + TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str)); + } else { + TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, "")); + } + break; + default: + break; + } + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) { + TAOS_CHECK_RETURN(tStartDecode(pDecoder)); + TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype)); + switch (pObj->dtype) { + case CFG_DTYPE_NONE: + break; + case CFG_DTYPE_BOOL: + TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval)); + break; + case CFG_DTYPE_INT32: + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32)); + break; + case CFG_DTYPE_INT64: + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64)); + break; + case CFG_DTYPE_FLOAT: + case CFG_DTYPE_DOUBLE: + TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval)); + break; + case CFG_DTYPE_STRING: + case CFG_DTYPE_DIR: + case CFG_DTYPE_LOCALE: + case CFG_DTYPE_CHARSET: + case CFG_DTYPE_TIMEZONE: + TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str)); + break; + } + tEndDecode(pDecoder); + TAOS_RETURN(TSDB_CODE_SUCCESS); +} + +void tFreeSConfigObj(SConfigObj *obj) { + if (obj == NULL) { + return; + } + if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE || + obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) { + taosMemoryFreeClear(obj->str); + } + taosMemoryFreeClear(obj); +} + // SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { // SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry)); // if (pEntryNew == NULL) return NULL;